Repository: flink Updated Branches: refs/heads/master b949d42d9 -> 5e30ba384
[FLINK-4745] [table] Convert KafkaTableSource test to unit tests This closes #2603. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5e30ba38 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5e30ba38 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5e30ba38 Branch: refs/heads/master Commit: 5e30ba384934b08861a970744db64b5123cfeff8 Parents: b949d42 Author: twalthr <twal...@apache.org> Authored: Wed Oct 5 16:55:20 2016 +0200 Committer: twalthr <twal...@apache.org> Committed: Mon Oct 10 11:22:19 2016 +0200 ---------------------------------------------------------------------- .../connectors/kafka/Kafka08ITCase.java | 65 ------------- .../kafka/Kafka08JsonTableSinkTest.java | 2 +- .../kafka/Kafka08JsonTableSourceTest.java | 45 +++++++++ .../connectors/kafka/Kafka09ITCase.java | 79 ---------------- .../kafka/Kafka09JsonTableSinkTest.java | 1 + .../kafka/Kafka09JsonTableSourceTest.java | 45 +++++++++ .../connectors/kafka/KafkaConsumerTestBase.java | 98 -------------------- .../kafka/KafkaTableSinkTestBase.java | 38 +++----- .../kafka/KafkaTableSourceTestBase.java | 77 +++++++++++++++ 9 files changed, 184 insertions(+), 266 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java index 467ccc5..1c69d78 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java @@ -19,8 +19,6 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.curator.framework.CuratorFramework; import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.runtime.client.JobCancellationException; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -33,7 +31,6 @@ import org.junit.Assert; import org.junit.Test; import java.util.Properties; -import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; @@ -368,66 +365,4 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { curatorFramework.close(); } - - @Test - public void testJsonTableSource() throws Exception { - String topic = UUID.randomUUID().toString(); - - // Names and types are determined in the actual test method of the - // base test class. - Kafka08JsonTableSource tableSource = new Kafka08JsonTableSource( - topic, - standardProps, - new String[] { - "long", - "string", - "boolean", - "double", - "missing-field"}, - new TypeInformation<?>[] { - BasicTypeInfo.LONG_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.BOOLEAN_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO }); - - // Don't fail on missing field, but set to null (default) - tableSource.setFailOnMissingField(false); - - runJsonTableSource(topic, tableSource); - } - - @Test - public void testJsonTableSourceWithFailOnMissingField() throws Exception { - String topic = UUID.randomUUID().toString(); - - // Names and types are determined in the actual test method of the - // base test class. - Kafka08JsonTableSource tableSource = new Kafka08JsonTableSource( - topic, - standardProps, - new String[] { - "long", - "string", - "boolean", - "double", - "missing-field"}, - new TypeInformation<?>[] { - BasicTypeInfo.LONG_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.BOOLEAN_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO }); - - // Don't fail on missing field, but set to null (default) - tableSource.setFailOnMissingField(true); - - try { - runJsonTableSource(topic, tableSource); - fail("Did not throw expected Exception"); - } catch (Exception e) { - Throwable rootCause = e.getCause().getCause().getCause(); - assertTrue("Unexpected root cause", rootCause instanceof IllegalStateException); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java index b1e6db9..446e1d7 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java @@ -28,7 +28,7 @@ public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase { @Override protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner, - final FlinkKafkaProducerBase<Row> kafkaProducer) { + final FlinkKafkaProducerBase<Row> kafkaProducer) { return new Kafka08JsonTableSink(topic, properties, partitioner) { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java new file mode 100644 index 0000000..a2d66ac --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.Properties; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; + +public class Kafka08JsonTableSourceTest extends KafkaTableSourceTestBase { + + @Override + protected KafkaTableSource createTableSource(String topic, Properties properties, String[] fieldNames, TypeInformation<?>[] typeInfo) { + return new Kafka08JsonTableSource(topic, properties, fieldNames, typeInfo); + } + + @Override + @SuppressWarnings("unchecked") + protected Class<DeserializationSchema<Row>> getDeserializationSchema() { + return (Class) JsonRowDeserializationSchema.class; + } + + @Override + @SuppressWarnings("unchecked") + protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() { + return (Class) FlinkKafkaConsumer08.class; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java index 16ddcdc..fd167a0 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java @@ -17,16 +17,8 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.junit.Test; -import java.util.Properties; -import java.util.UUID; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - public class Kafka09ITCase extends KafkaConsumerTestBase { // ------------------------------------------------------------------------ @@ -126,75 +118,4 @@ public class Kafka09ITCase extends KafkaConsumerTestBase { public void testMetrics() throws Throwable { runMetricsTest(); } - - @Test - public void testJsonTableSource() throws Exception { - String topic = UUID.randomUUID().toString(); - - Properties props = new Properties(); - props.putAll(standardProps); - props.putAll(secureProps); - - // Names and types are determined in the actual test method of the - // base test class. - Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource( - topic, - props, - new String[] { - "long", - "string", - "boolean", - "double", - "missing-field"}, - new TypeInformation<?>[] { - BasicTypeInfo.LONG_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.BOOLEAN_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO }); - - // Don't fail on missing field, but set to null (default) - tableSource.setFailOnMissingField(false); - - runJsonTableSource(topic, tableSource); - } - - @Test - public void testJsonTableSourceWithFailOnMissingField() throws Exception { - String topic = UUID.randomUUID().toString(); - - Properties props = new Properties(); - props.putAll(standardProps); - props.putAll(secureProps); - - // Names and types are determined in the actual test method of the - // base test class. - Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource( - topic, - props, - new String[] { - "long", - "string", - "boolean", - "double", - "missing-field"}, - new TypeInformation<?>[] { - BasicTypeInfo.LONG_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.BOOLEAN_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO }); - - // Don't fail on missing field, but set to null (default) - tableSource.setFailOnMissingField(true); - - try { - runJsonTableSource(topic, tableSource); - fail("Did not throw expected Exception"); - } catch (Exception e) { - Throwable rootCause = e.getCause().getCause().getCause(); - assertTrue("Unexpected root cause", rootCause instanceof IllegalStateException); - } - } - } http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java index bfdcf68..068640d 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java @@ -29,6 +29,7 @@ public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase { @Override protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner, final FlinkKafkaProducerBase<Row> kafkaProducer) { + return new Kafka09JsonTableSink(topic, properties, partitioner) { @Override protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java new file mode 100644 index 0000000..4a75f50 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.Properties; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; + +public class Kafka09JsonTableSourceTest extends KafkaTableSourceTestBase { + + @Override + protected KafkaTableSource createTableSource(String topic, Properties properties, String[] fieldNames, TypeInformation<?>[] typeInfo) { + return new Kafka09JsonTableSource(topic, properties, fieldNames, typeInfo); + } + + @Override + @SuppressWarnings("unchecked") + protected Class<DeserializationSchema<Row>> getDeserializationSchema() { + return (Class) JsonRowDeserializationSchema.class; + } + + @Override + @SuppressWarnings("unchecked") + protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() { + return (Class) FlinkKafkaConsumer09.class; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 9c36b43..bafff4f 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -787,104 +787,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { } /** - * Runs a table source test with JSON data. - * - * The table source needs to parse the following JSON fields: - * - "long" -> number - * - "string" -> "string" - * - "boolean" -> true|false - * - "double" -> fraction - */ - public void runJsonTableSource(String topic, KafkaTableSource kafkaTableSource) throws Exception { - final ObjectMapper mapper = new ObjectMapper(); - - final int numElements = 1024; - final long[] longs = new long[numElements]; - final String[] strings = new String[numElements]; - final boolean[] booleans = new boolean[numElements]; - final double[] doubles = new double[numElements]; - - final byte[][] serializedJson = new byte[numElements][]; - - ThreadLocalRandom random = ThreadLocalRandom.current(); - - for (int i = 0; i < numElements; i++) { - longs[i] = random.nextLong(); - strings[i] = Integer.toHexString(random.nextInt()); - booleans[i] = random.nextBoolean(); - doubles[i] = random.nextDouble(); - - ObjectNode entry = mapper.createObjectNode(); - entry.put("long", longs[i]); - entry.put("string", strings[i]); - entry.put("boolean", booleans[i]); - entry.put("double", doubles[i]); - - serializedJson[i] = mapper.writeValueAsBytes(entry); - } - - // Produce serialized JSON data - createTestTopic(topic, 1, 1); - - Properties props = new Properties(); - props.putAll(standardProps); - props.putAll(secureProps); - - StreamExecutionEnvironment env = StreamExecutionEnvironment - .createRemoteEnvironment("localhost", flinkPort); - env.getConfig().disableSysoutLogging(); - - env.addSource(new SourceFunction<byte[]>() { - @Override - public void run(SourceContext<byte[]> ctx) throws Exception { - for (int i = 0; i < numElements; i++) { - ctx.collect(serializedJson[i]); - } - } - - @Override - public void cancel() { - } - }).addSink(kafkaServer.getProducer( - topic, - new ByteArraySerializationSchema(), - props, - null)); - - // Execute blocks - env.execute(); - - // Register as table source - StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env); - tableEnvironment.registerTableSource("kafka", kafkaTableSource); - - Table result = tableEnvironment.ingest("kafka"); - - tableEnvironment.toDataStream(result, Row.class).addSink(new SinkFunction<Row>() { - - int i = 0; - - @Override - public void invoke(Row value) throws Exception { - assertEquals(5, value.productArity()); - assertEquals(longs[i], value.productElement(0)); - assertEquals(strings[i], value.productElement(1)); - assertEquals(booleans[i], value.productElement(2)); - assertEquals(doubles[i], value.productElement(3)); - assertNull(value.productElement(4)); - - if (i == numElements-1) { - throw new SuccessException(); - } else { - i++; - } - } - }); - - tryExecutePropagateExceptions(env, "KafkaTableSource"); - } - - /** * Serialization scheme forwarding byte[] records. */ private static class ByteArraySerializationSchema implements KeyedSerializationSchema<byte[]> { http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java index e46ca08..baddab1 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java @@ -38,39 +38,31 @@ import static org.mockito.Mockito.any; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; -public abstract class KafkaTableSinkTestBase implements Serializable { - - private final static String TOPIC = "testTopic"; - private final static String[] FIELD_NAMES = new String[] {"field1", "field2"}; - private final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class}); - - private final KafkaPartitioner<Row> partitioner = new CustomPartitioner(); - private final Properties properties = createSinkProperties(); +public abstract class KafkaTableSinkTestBase { + + private static final String TOPIC = "testTopic"; + private static final String[] FIELD_NAMES = new String[] {"field1", "field2"}; + private static final TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class}); + private static final KafkaPartitioner<Row> PARTITIONER = new CustomPartitioner(); + private static final Properties PROPERTIES = createSinkProperties(); + // we have to mock FlinkKafkaProducerBase as it cannot be instantiated without Kafka @SuppressWarnings("unchecked") - private final FlinkKafkaProducerBase<Row> kafkaProducer = mock(FlinkKafkaProducerBase.class); + private static final FlinkKafkaProducerBase<Row> PRODUCER = mock(FlinkKafkaProducerBase.class); @Test @SuppressWarnings("unchecked") public void testKafkaTableSink() throws Exception { DataStream dataStream = mock(DataStream.class); - KafkaTableSink kafkaTableSink = createTableSink(); - kafkaTableSink.emitDataStream(dataStream); - - verify(dataStream).addSink(kafkaProducer); - } - - @Test - @SuppressWarnings("unchecked") - public void testCreatedProducer() throws Exception { - DataStream dataStream = mock(DataStream.class); KafkaTableSink kafkaTableSink = spy(createTableSink()); kafkaTableSink.emitDataStream(dataStream); + verify(dataStream).addSink(eq(PRODUCER)); + verify(kafkaTableSink).createKafkaProducer( eq(TOPIC), - eq(properties), + eq(PROPERTIES), any(getSerializationSchema()), - eq(partitioner)); + eq(PARTITIONER)); } @Test @@ -90,12 +82,12 @@ public abstract class KafkaTableSinkTestBase implements Serializable { protected abstract Class<SerializationSchema<Row>> getSerializationSchema(); private KafkaTableSink createTableSink() { - return createTableSink(TOPIC, properties, partitioner, kafkaProducer); + return createTableSink(TOPIC, PROPERTIES, PARTITIONER, PRODUCER); } private static Properties createSinkProperties() { Properties properties = new Properties(); - properties.setProperty("testKey", "testValue"); + properties.setProperty("bootstrap.servers", "localhost:12345"); return properties; } http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java new file mode 100644 index 0000000..2a281e8 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.Properties; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.junit.Test; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +public abstract class KafkaTableSourceTestBase { + + private static final String TOPIC = "testTopic"; + private static final String[] FIELD_NAMES = new String[] { "long", "string", "boolean", "double", "missing-field" }; + private static final TypeInformation<?>[] FIELD_TYPES = new TypeInformation<?>[] { + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.BOOLEAN_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO }; + private static final Properties PROPERTIES = createSourceProperties(); + + @Test + public void testKafkaTableSource() { + KafkaTableSource kafkaTableSource = spy(createTableSource()); + StreamExecutionEnvironment env = mock(StreamExecutionEnvironment.class); + kafkaTableSource.getDataStream(env); + + verify(env).addSource(any(getFlinkKafkaConsumer())); + + verify(kafkaTableSource).getKafkaConsumer( + eq(TOPIC), + eq(PROPERTIES), + any(getDeserializationSchema())); + } + + protected abstract KafkaTableSource createTableSource(String topic, Properties properties, + String[] fieldNames, TypeInformation<?>[] typeInfo); + + protected abstract Class<DeserializationSchema<Row>> getDeserializationSchema(); + + protected abstract Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer(); + + private KafkaTableSource createTableSource() { + return createTableSource(TOPIC, PROPERTIES, FIELD_NAMES, FIELD_TYPES); + } + + private static Properties createSourceProperties() { + Properties properties = new Properties(); + properties.setProperty("zookeeper.connect", "dummy"); + properties.setProperty("group.id", "dummy"); + return properties; + } +}