[
https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15527303#comment-15527303
]
ASF GitHub Bot commented on FLINK-3874:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/2430#discussion_r80783113
--- Diff:
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
---
@@ -17,123 +17,79 @@
*/
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
-import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
import
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.test.util.SuccessException;
+import
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.junit.Test;
import java.io.Serializable;
-import java.util.HashSet;
import java.util.Properties;
-import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
-public abstract class KafkaTableSinkTestBase extends KafkaTestBase
implements Serializable {
+public abstract class KafkaTableSinkTestBase implements Serializable {
- protected final static String TOPIC = "customPartitioningTestTopic";
- protected final static int PARALLELISM = 1;
+ protected final static String TOPIC = "testTopic";
protected final static String[] FIELD_NAMES = new String[] {"field1",
"field2"};
protected final static TypeInformation[] FIELD_TYPES =
TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
+ protected FlinkKafkaProducerBase<Row> kafkaProducer =
mock(FlinkKafkaProducerBase.class);
+
@Test
public void testKafkaTableSink() throws Exception {
- LOG.info("Starting
KafkaTableSinkTestBase.testKafkaTableSink()");
-
- createTestTopic(TOPIC, PARALLELISM, 1);
- StreamExecutionEnvironment env = createEnvironment();
-
- createProducingTopology(env);
- createConsumingTopology(env);
-
- tryExecute(env, "custom partitioning test");
- deleteTestTopic(TOPIC);
- LOG.info("Finished
KafkaTableSinkTestBase.testKafkaTableSink()");
- }
+ DataStream dataStream = mock(DataStream.class);
+ KafkaTableSink kafkaTableSink = createTableSink();
+ kafkaTableSink.emitDataStream(dataStream);
- private StreamExecutionEnvironment createEnvironment() {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.setRestartStrategy(RestartStrategies.noRestart());
- env.getConfig().disableSysoutLogging();
- return env;
+ verify(dataStream).addSink(kafkaProducer);
}
- private void createProducingTopology(StreamExecutionEnvironment env) {
- DataStream<Row> stream = env.addSource(new
SourceFunction<Row>() {
- private boolean running = true;
-
- @Override
- public void run(SourceContext<Row> ctx) throws
Exception {
- long cnt = 0;
- while (running) {
- Row row = new Row(2);
- row.setField(0, cnt);
- row.setField(1, "kafka-" + cnt);
- ctx.collect(row);
- cnt++;
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- })
- .setParallelism(1);
-
- KafkaTableSink kafkaTableSinkBase = createTableSink();
-
- kafkaTableSinkBase.emitDataStream(stream);
+ @Test
+ public void testCorrectProducerIsCreated() throws Exception {
+ DataStream dataStream = mock(DataStream.class);
+ KafkaTableSink kafkaTableSink = spy(createTableSink());
+ kafkaTableSink.emitDataStream(dataStream);
+
+ verify(kafkaTableSink).createKafkaProducer(
+ eq(TOPIC),
+ eq(createSinkProperties()),
+ any(JsonRowSerializationSchema.class),
+ any(CustomPartitioner.class));
}
- private void createConsumingTopology(StreamExecutionEnvironment env) {
- DeserializationSchema<Row> deserializationSchema =
createRowDeserializationSchema();
-
- FlinkKafkaConsumerBase<Row> source =
kafkaServer.getConsumer(TOPIC, deserializationSchema, standardProps);
-
- env.addSource(source).setParallelism(PARALLELISM)
- .map(new RichMapFunction<Row, Integer>() {
- @Override
- public Integer map(Row value) {
- return (Integer)
value.productElement(0);
- }
- }).setParallelism(PARALLELISM)
-
- .addSink(new SinkFunction<Integer>() {
- HashSet<Integer> ids = new HashSet<>();
- @Override
- public void invoke(Integer value) throws
Exception {
- ids.add(value);
-
- if (ids.size() == 100) {
- throw new SuccessException();
- }
- }
- }).setParallelism(1);
+ @Test
+ public void testConfigure() {
+ KafkaTableSink kafkaTableSink = createTableSink();
+ KafkaTableSink newKafkaTableSink =
kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
+ assertNotSame(kafkaTableSink, newKafkaTableSink);
+
+ assertArrayEquals(FIELD_NAMES,
newKafkaTableSink.getFieldNames());
+ assertArrayEquals(FIELD_TYPES,
newKafkaTableSink.getFieldTypes());
+ assertEquals(new RowTypeInfo(FIELD_TYPES),
newKafkaTableSink.getOutputType());
}
protected KafkaPartitioner<Row> createPartitioner() {
return new CustomPartitioner();
}
protected Properties createSinkProperties() {
- return
FlinkKafkaProducerBase.getPropertiesFromBrokerList(KafkaTestBase.brokerConnectionStrings);
+ return new Properties();
}
protected abstract KafkaTableSink createTableSink();
--- End diff --
I would add the required parameter for a KafkaJsonTableSink here (topic,
props, partitioner). That will make the code a bit easier to understand, IMO.
> Add a Kafka TableSink with JSON serialization
> ---------------------------------------------
>
> Key: FLINK-3874
> URL: https://issues.apache.org/jira/browse/FLINK-3874
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Fabian Hueske
> Assignee: Ivan Mushketyk
> Priority: Minor
>
> Add a TableSink that writes JSON serialized data to Kafka.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)