Repository: flink
Updated Branches:
  refs/heads/master b949d42d9 -> 5e30ba384


[FLINK-4745] [table] Convert KafkaTableSource test to unit tests

This closes #2603.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5e30ba38
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5e30ba38
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5e30ba38

Branch: refs/heads/master
Commit: 5e30ba384934b08861a970744db64b5123cfeff8
Parents: b949d42
Author: twalthr <twal...@apache.org>
Authored: Wed Oct 5 16:55:20 2016 +0200
Committer: twalthr <twal...@apache.org>
Committed: Mon Oct 10 11:22:19 2016 +0200

----------------------------------------------------------------------
 .../connectors/kafka/Kafka08ITCase.java         | 65 -------------
 .../kafka/Kafka08JsonTableSinkTest.java         |  2 +-
 .../kafka/Kafka08JsonTableSourceTest.java       | 45 +++++++++
 .../connectors/kafka/Kafka09ITCase.java         | 79 ----------------
 .../kafka/Kafka09JsonTableSinkTest.java         |  1 +
 .../kafka/Kafka09JsonTableSourceTest.java       | 45 +++++++++
 .../connectors/kafka/KafkaConsumerTestBase.java | 98 --------------------
 .../kafka/KafkaTableSinkTestBase.java           | 38 +++-----
 .../kafka/KafkaTableSourceTestBase.java         | 77 +++++++++++++++
 9 files changed, 184 insertions(+), 266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 467ccc5..1c69d78 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -19,8 +19,6 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.runtime.client.JobCancellationException;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -33,7 +31,6 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Properties;
-import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
@@ -368,66 +365,4 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 
                curatorFramework.close();
        }
-
-       @Test
-       public void testJsonTableSource() throws Exception {
-               String topic = UUID.randomUUID().toString();
-
-               // Names and types are determined in the actual test method of 
the
-               // base test class.
-               Kafka08JsonTableSource tableSource = new Kafka08JsonTableSource(
-                               topic,
-                               standardProps,
-                               new String[] {
-                                               "long",
-                                               "string",
-                                               "boolean",
-                                               "double",
-                                               "missing-field"},
-                               new TypeInformation<?>[] {
-                                               BasicTypeInfo.LONG_TYPE_INFO,
-                                               BasicTypeInfo.STRING_TYPE_INFO,
-                                               BasicTypeInfo.BOOLEAN_TYPE_INFO,
-                                               BasicTypeInfo.DOUBLE_TYPE_INFO,
-                                               BasicTypeInfo.LONG_TYPE_INFO });
-
-               // Don't fail on missing field, but set to null (default)
-               tableSource.setFailOnMissingField(false);
-
-               runJsonTableSource(topic, tableSource);
-       }
-
-       @Test
-       public void testJsonTableSourceWithFailOnMissingField() throws 
Exception {
-               String topic = UUID.randomUUID().toString();
-
-               // Names and types are determined in the actual test method of 
the
-               // base test class.
-               Kafka08JsonTableSource tableSource = new Kafka08JsonTableSource(
-                               topic,
-                               standardProps,
-                               new String[] {
-                                               "long",
-                                               "string",
-                                               "boolean",
-                                               "double",
-                                               "missing-field"},
-                               new TypeInformation<?>[] {
-                                               BasicTypeInfo.LONG_TYPE_INFO,
-                                               BasicTypeInfo.STRING_TYPE_INFO,
-                                               BasicTypeInfo.BOOLEAN_TYPE_INFO,
-                                               BasicTypeInfo.DOUBLE_TYPE_INFO,
-                                               BasicTypeInfo.LONG_TYPE_INFO });
-
-               // Don't fail on missing field, but set to null (default)
-               tableSource.setFailOnMissingField(true);
-
-               try {
-                       runJsonTableSource(topic, tableSource);
-                       fail("Did not throw expected Exception");
-               } catch (Exception e) {
-                       Throwable rootCause = 
e.getCause().getCause().getCause();
-                       assertTrue("Unexpected root cause", rootCause 
instanceof IllegalStateException);
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index b1e6db9..446e1d7 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -28,7 +28,7 @@ public class Kafka08JsonTableSinkTest extends 
KafkaTableSinkTestBase {
 
        @Override
        protected KafkaTableSink createTableSink(String topic, Properties 
properties, KafkaPartitioner<Row> partitioner,
-                               final FlinkKafkaProducerBase<Row> 
kafkaProducer) {
+                       final FlinkKafkaProducerBase<Row> kafkaProducer) {
 
                return new Kafka08JsonTableSink(topic, properties, partitioner) 
{
                        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
new file mode 100644
index 0000000..a2d66ac
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.Properties;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+
+public class Kafka08JsonTableSourceTest extends KafkaTableSourceTestBase {
+
+       @Override
+       protected KafkaTableSource createTableSource(String topic, Properties 
properties, String[] fieldNames, TypeInformation<?>[] typeInfo) {
+               return new Kafka08JsonTableSource(topic, properties, 
fieldNames, typeInfo);
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
+               return (Class) JsonRowDeserializationSchema.class;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
+               return (Class) FlinkKafkaConsumer08.class;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index 16ddcdc..fd167a0 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -17,16 +17,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.junit.Test;
 
-import java.util.Properties;
-import java.util.UUID;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 public class Kafka09ITCase extends KafkaConsumerTestBase {
 
        // 
------------------------------------------------------------------------
@@ -126,75 +118,4 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
        public void testMetrics() throws Throwable {
                runMetricsTest();
        }
-
-       @Test
-       public void testJsonTableSource() throws Exception {
-               String topic = UUID.randomUUID().toString();
-
-               Properties props = new Properties();
-               props.putAll(standardProps);
-               props.putAll(secureProps);
-
-               // Names and types are determined in the actual test method of 
the
-               // base test class.
-               Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource(
-                               topic,
-                               props,
-                               new String[] {
-                                               "long",
-                                               "string",
-                                               "boolean",
-                                               "double",
-                                               "missing-field"},
-                               new TypeInformation<?>[] {
-                                               BasicTypeInfo.LONG_TYPE_INFO,
-                                               BasicTypeInfo.STRING_TYPE_INFO,
-                                               BasicTypeInfo.BOOLEAN_TYPE_INFO,
-                                               BasicTypeInfo.DOUBLE_TYPE_INFO,
-                                               BasicTypeInfo.LONG_TYPE_INFO });
-
-               // Don't fail on missing field, but set to null (default)
-               tableSource.setFailOnMissingField(false);
-
-               runJsonTableSource(topic, tableSource);
-       }
-
-       @Test
-       public void testJsonTableSourceWithFailOnMissingField() throws 
Exception {
-               String topic = UUID.randomUUID().toString();
-
-               Properties props = new Properties();
-               props.putAll(standardProps);
-               props.putAll(secureProps);
-
-               // Names and types are determined in the actual test method of 
the
-               // base test class.
-               Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource(
-                               topic,
-                               props,
-                               new String[] {
-                                               "long",
-                                               "string",
-                                               "boolean",
-                                               "double",
-                                               "missing-field"},
-                               new TypeInformation<?>[] {
-                                               BasicTypeInfo.LONG_TYPE_INFO,
-                                               BasicTypeInfo.STRING_TYPE_INFO,
-                                               BasicTypeInfo.BOOLEAN_TYPE_INFO,
-                                               BasicTypeInfo.DOUBLE_TYPE_INFO,
-                                               BasicTypeInfo.LONG_TYPE_INFO });
-
-               // Don't fail on missing field, but set to null (default)
-               tableSource.setFailOnMissingField(true);
-
-               try {
-                       runJsonTableSource(topic, tableSource);
-                       fail("Did not throw expected Exception");
-               } catch (Exception e) {
-                       Throwable rootCause = 
e.getCause().getCause().getCause();
-                       assertTrue("Unexpected root cause", rootCause 
instanceof IllegalStateException);
-               }
-       }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index bfdcf68..068640d 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -29,6 +29,7 @@ public class Kafka09JsonTableSinkTest extends 
KafkaTableSinkTestBase {
        @Override
        protected KafkaTableSink createTableSink(String topic, Properties 
properties, KafkaPartitioner<Row> partitioner,
                        final FlinkKafkaProducerBase<Row> kafkaProducer) {
+
                return new Kafka09JsonTableSink(topic, properties, partitioner) 
{
                        @Override
                        protected FlinkKafkaProducerBase<Row> 
createKafkaProducer(String topic, Properties properties,

http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
new file mode 100644
index 0000000..4a75f50
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.Properties;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+
+public class Kafka09JsonTableSourceTest extends KafkaTableSourceTestBase {
+
+       @Override
+       protected KafkaTableSource createTableSource(String topic, Properties 
properties, String[] fieldNames, TypeInformation<?>[] typeInfo) {
+               return new Kafka09JsonTableSource(topic, properties, 
fieldNames, typeInfo);
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
+               return (Class) JsonRowDeserializationSchema.class;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
+               return (Class) FlinkKafkaConsumer09.class;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 9c36b43..bafff4f 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -787,104 +787,6 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
        }
 
        /**
-        * Runs a table source test with JSON data.
-        *
-        * The table source needs to parse the following JSON fields:
-        * - "long" -> number
-        * - "string" -> "string"
-        * - "boolean" -> true|false
-        * - "double" -> fraction
-        */
-       public void runJsonTableSource(String topic, KafkaTableSource 
kafkaTableSource) throws Exception {
-               final ObjectMapper mapper = new ObjectMapper();
-
-               final int numElements = 1024;
-               final long[] longs = new long[numElements];
-               final String[] strings = new String[numElements];
-               final boolean[] booleans = new boolean[numElements];
-               final double[] doubles = new double[numElements];
-
-               final byte[][] serializedJson = new byte[numElements][];
-
-               ThreadLocalRandom random = ThreadLocalRandom.current();
-
-               for (int i = 0; i < numElements; i++) {
-                       longs[i] = random.nextLong();
-                       strings[i] = Integer.toHexString(random.nextInt());
-                       booleans[i] = random.nextBoolean();
-                       doubles[i] = random.nextDouble();
-
-                       ObjectNode entry = mapper.createObjectNode();
-                       entry.put("long", longs[i]);
-                       entry.put("string", strings[i]);
-                       entry.put("boolean", booleans[i]);
-                       entry.put("double", doubles[i]);
-
-                       serializedJson[i] = mapper.writeValueAsBytes(entry);
-               }
-
-               // Produce serialized JSON data
-               createTestTopic(topic, 1, 1);
-
-               Properties props = new Properties();
-               props.putAll(standardProps);
-               props.putAll(secureProps);
-
-               StreamExecutionEnvironment env = StreamExecutionEnvironment
-                               .createRemoteEnvironment("localhost", 
flinkPort);
-               env.getConfig().disableSysoutLogging();
-
-               env.addSource(new SourceFunction<byte[]>() {
-                       @Override
-                       public void run(SourceContext<byte[]> ctx) throws 
Exception {
-                               for (int i = 0; i < numElements; i++) {
-                                       ctx.collect(serializedJson[i]);
-                               }
-                       }
-
-                       @Override
-                       public void cancel() {
-                       }
-               }).addSink(kafkaServer.getProducer(
-                               topic,
-                               new ByteArraySerializationSchema(),
-                               props,
-                               null));
-
-               // Execute blocks
-               env.execute();
-
-               // Register as table source
-               StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.getTableEnvironment(env);
-               tableEnvironment.registerTableSource("kafka", kafkaTableSource);
-
-               Table result = tableEnvironment.ingest("kafka");
-
-               tableEnvironment.toDataStream(result, Row.class).addSink(new 
SinkFunction<Row>() {
-
-                       int i = 0;
-
-                       @Override
-                       public void invoke(Row value) throws Exception {
-                               assertEquals(5, value.productArity());
-                               assertEquals(longs[i], value.productElement(0));
-                               assertEquals(strings[i], 
value.productElement(1));
-                               assertEquals(booleans[i], 
value.productElement(2));
-                               assertEquals(doubles[i], 
value.productElement(3));
-                               assertNull(value.productElement(4));
-
-                               if (i == numElements-1) {
-                                       throw new SuccessException();
-                               } else {
-                                       i++;
-                               }
-                       }
-               });
-
-               tryExecutePropagateExceptions(env, "KafkaTableSource");
-       }
-
-       /**
         * Serialization scheme forwarding byte[] records.
         */
        private static class ByteArraySerializationSchema implements 
KeyedSerializationSchema<byte[]> {

http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index e46ca08..baddab1 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -38,39 +38,31 @@ import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
-public abstract class KafkaTableSinkTestBase implements Serializable {
-
-       private final static String TOPIC = "testTopic";
-       private final static String[] FIELD_NAMES = new String[] {"field1", 
"field2"};
-       private final static TypeInformation[] FIELD_TYPES = 
TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
-
-       private final KafkaPartitioner<Row> partitioner = new 
CustomPartitioner();
-       private final Properties properties = createSinkProperties();
+public abstract class KafkaTableSinkTestBase {
+
+       private static final String TOPIC = "testTopic";
+       private static final String[] FIELD_NAMES = new String[] {"field1", 
"field2"};
+       private static final TypeInformation[] FIELD_TYPES = 
TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
+       private static final KafkaPartitioner<Row> PARTITIONER = new 
CustomPartitioner();
+       private static final Properties PROPERTIES = createSinkProperties();
+       // we have to mock FlinkKafkaProducerBase as it cannot be instantiated 
without Kafka
        @SuppressWarnings("unchecked")
-       private final FlinkKafkaProducerBase<Row> kafkaProducer = 
mock(FlinkKafkaProducerBase.class);
+       private static final FlinkKafkaProducerBase<Row> PRODUCER = 
mock(FlinkKafkaProducerBase.class);
 
        @Test
        @SuppressWarnings("unchecked")
        public void testKafkaTableSink() throws Exception {
                DataStream dataStream = mock(DataStream.class);
-               KafkaTableSink kafkaTableSink = createTableSink();
-               kafkaTableSink.emitDataStream(dataStream);
-
-               verify(dataStream).addSink(kafkaProducer);
-       }
-
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testCreatedProducer() throws Exception {
-               DataStream dataStream = mock(DataStream.class);
                KafkaTableSink kafkaTableSink = spy(createTableSink());
                kafkaTableSink.emitDataStream(dataStream);
 
+               verify(dataStream).addSink(eq(PRODUCER));
+
                verify(kafkaTableSink).createKafkaProducer(
                        eq(TOPIC),
-                       eq(properties),
+                       eq(PROPERTIES),
                        any(getSerializationSchema()),
-                       eq(partitioner));
+                       eq(PARTITIONER));
        }
 
        @Test
@@ -90,12 +82,12 @@ public abstract class KafkaTableSinkTestBase implements 
Serializable {
        protected abstract Class<SerializationSchema<Row>> 
getSerializationSchema();
 
        private KafkaTableSink createTableSink() {
-               return createTableSink(TOPIC, properties, partitioner, 
kafkaProducer);
+               return createTableSink(TOPIC, PROPERTIES, PARTITIONER, 
PRODUCER);
        }
 
        private static Properties createSinkProperties() {
                Properties properties = new Properties();
-               properties.setProperty("testKey", "testValue");
+               properties.setProperty("bootstrap.servers", "localhost:12345");
                return properties;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
new file mode 100644
index 0000000..2a281e8
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.Properties;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.junit.Test;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+public abstract class KafkaTableSourceTestBase {
+
+       private static final String TOPIC = "testTopic";
+       private static final String[] FIELD_NAMES = new String[] { "long", 
"string", "boolean", "double", "missing-field" };
+       private static final TypeInformation<?>[] FIELD_TYPES = new 
TypeInformation<?>[] {
+               BasicTypeInfo.LONG_TYPE_INFO,
+               BasicTypeInfo.STRING_TYPE_INFO,
+               BasicTypeInfo.BOOLEAN_TYPE_INFO,
+               BasicTypeInfo.DOUBLE_TYPE_INFO,
+               BasicTypeInfo.LONG_TYPE_INFO };
+       private static final Properties PROPERTIES = createSourceProperties();
+
+       @Test
+       public void testKafkaTableSource() {
+               KafkaTableSource kafkaTableSource = spy(createTableSource());
+               StreamExecutionEnvironment env = 
mock(StreamExecutionEnvironment.class);
+               kafkaTableSource.getDataStream(env);
+
+               verify(env).addSource(any(getFlinkKafkaConsumer()));
+
+               verify(kafkaTableSource).getKafkaConsumer(
+                       eq(TOPIC),
+                       eq(PROPERTIES),
+                       any(getDeserializationSchema()));
+       }
+
+       protected abstract KafkaTableSource createTableSource(String topic, 
Properties properties,
+                       String[] fieldNames, TypeInformation<?>[] typeInfo);
+
+       protected abstract Class<DeserializationSchema<Row>> 
getDeserializationSchema();
+
+       protected abstract Class<FlinkKafkaConsumerBase<Row>> 
getFlinkKafkaConsumer();
+
+       private KafkaTableSource createTableSource() {
+               return createTableSource(TOPIC, PROPERTIES, FIELD_NAMES, 
FIELD_TYPES);
+       }
+
+       private static Properties createSourceProperties() {
+               Properties properties = new Properties();
+               properties.setProperty("zookeeper.connect", "dummy");
+               properties.setProperty("group.id", "dummy");
+               return properties;
+       }
+}

Reply via email to