[ https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15527303#comment-15527303 ]
ASF GitHub Bot commented on FLINK-3874: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2430#discussion_r80783113 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java --- @@ -17,123 +17,79 @@ */ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.test.util.SuccessException; +import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; import org.junit.Test; import java.io.Serializable; -import java.util.HashSet; import java.util.Properties; -import static org.apache.flink.test.util.TestUtils.tryExecute; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; -public abstract class KafkaTableSinkTestBase extends KafkaTestBase implements Serializable { +public abstract class KafkaTableSinkTestBase implements Serializable { - protected final static String TOPIC = "customPartitioningTestTopic"; - protected final static int PARALLELISM = 1; + protected final static String TOPIC = "testTopic"; protected final static String[] FIELD_NAMES = new String[] {"field1", "field2"}; protected final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class}); + protected FlinkKafkaProducerBase<Row> kafkaProducer = mock(FlinkKafkaProducerBase.class); + @Test public void testKafkaTableSink() throws Exception { - LOG.info("Starting KafkaTableSinkTestBase.testKafkaTableSink()"); - - createTestTopic(TOPIC, PARALLELISM, 1); - StreamExecutionEnvironment env = createEnvironment(); - - createProducingTopology(env); - createConsumingTopology(env); - - tryExecute(env, "custom partitioning test"); - deleteTestTopic(TOPIC); - LOG.info("Finished KafkaTableSinkTestBase.testKafkaTableSink()"); - } + DataStream dataStream = mock(DataStream.class); + KafkaTableSink kafkaTableSink = createTableSink(); + kafkaTableSink.emitDataStream(dataStream); - private StreamExecutionEnvironment createEnvironment() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.setRestartStrategy(RestartStrategies.noRestart()); - env.getConfig().disableSysoutLogging(); - return env; + verify(dataStream).addSink(kafkaProducer); } - private void createProducingTopology(StreamExecutionEnvironment env) { - DataStream<Row> stream = env.addSource(new SourceFunction<Row>() { - private boolean running = true; - - @Override - public void run(SourceContext<Row> ctx) throws Exception { - long cnt = 0; - while (running) { - Row row = new Row(2); - row.setField(0, cnt); - row.setField(1, "kafka-" + cnt); - ctx.collect(row); - cnt++; - } - } - - @Override - public void cancel() { - running = false; - } - }) - .setParallelism(1); - - KafkaTableSink kafkaTableSinkBase = createTableSink(); - - kafkaTableSinkBase.emitDataStream(stream); + @Test + public void testCorrectProducerIsCreated() throws Exception { + DataStream dataStream = mock(DataStream.class); + KafkaTableSink kafkaTableSink = spy(createTableSink()); + kafkaTableSink.emitDataStream(dataStream); + + verify(kafkaTableSink).createKafkaProducer( + eq(TOPIC), + eq(createSinkProperties()), + any(JsonRowSerializationSchema.class), + any(CustomPartitioner.class)); } - private void createConsumingTopology(StreamExecutionEnvironment env) { - DeserializationSchema<Row> deserializationSchema = createRowDeserializationSchema(); - - FlinkKafkaConsumerBase<Row> source = kafkaServer.getConsumer(TOPIC, deserializationSchema, standardProps); - - env.addSource(source).setParallelism(PARALLELISM) - .map(new RichMapFunction<Row, Integer>() { - @Override - public Integer map(Row value) { - return (Integer) value.productElement(0); - } - }).setParallelism(PARALLELISM) - - .addSink(new SinkFunction<Integer>() { - HashSet<Integer> ids = new HashSet<>(); - @Override - public void invoke(Integer value) throws Exception { - ids.add(value); - - if (ids.size() == 100) { - throw new SuccessException(); - } - } - }).setParallelism(1); + @Test + public void testConfigure() { + KafkaTableSink kafkaTableSink = createTableSink(); + KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES); + assertNotSame(kafkaTableSink, newKafkaTableSink); + + assertArrayEquals(FIELD_NAMES, newKafkaTableSink.getFieldNames()); + assertArrayEquals(FIELD_TYPES, newKafkaTableSink.getFieldTypes()); + assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType()); } protected KafkaPartitioner<Row> createPartitioner() { return new CustomPartitioner(); } protected Properties createSinkProperties() { - return FlinkKafkaProducerBase.getPropertiesFromBrokerList(KafkaTestBase.brokerConnectionStrings); + return new Properties(); } protected abstract KafkaTableSink createTableSink(); --- End diff -- I would add the required parameter for a KafkaJsonTableSink here (topic, props, partitioner). That will make the code a bit easier to understand, IMO. > Add a Kafka TableSink with JSON serialization > --------------------------------------------- > > Key: FLINK-3874 > URL: https://issues.apache.org/jira/browse/FLINK-3874 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: Fabian Hueske > Assignee: Ivan Mushketyk > Priority: Minor > > Add a TableSink that writes JSON serialized data to Kafka. -- This message was sent by Atlassian JIRA (v6.3.4#6332)