The dependency was already there. Below is my build.gradle. Also I checked
the kafka version and looks like the jar

flinkShadowJar "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"

downloads kafka-clients version 2.2.0. So I changed my code to version
2.2.0 and same problem persists.

buildscript {
    repositories {
        jcenter() // this applies only to the Gradle 'Shadow' plugin
    }
    dependencies {
        classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
    }
}

plugins {
    id 'java'
    id 'application'
}

mainClassName = 'Test'
apply plugin: 'com.github.johnrengelman.shadow'

ext {
    javaVersion = '1.8'
    flinkVersion = '1.10.0'
    scalaBinaryVersion = '2.11'
    slf4jVersion = '1.7.7'
    log4jVersion = '1.2.17'
}


sourceCompatibility = javaVersion
targetCompatibility = javaVersion
tasks.withType(JavaCompile) {
    options.encoding = 'UTF-8'
}

applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"]

// declare where to find the dependencies of your project
repositories {
    mavenCentral()
    maven { url
"https://repository.apache.org/content/repositories/snapshots/"; }
}

// NOTE: We cannot use "compileOnly" or "shadow" configurations since
then we could not run code
// in the IDE or with "gradle run". We also cannot exclude transitive
dependencies from the
// shadowJar yet (see https://github.com/johnrengelman/shadow/issues/159).
// -> Explicitly define the // libraries we want to be included in the
"flinkShadowJar" configuration!
configurations {
    flinkShadowJar // dependencies which go into the shadowJar

    // always exclude these (also from transitive dependencies) since
they are provided by Flink
    flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading'
    flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305'
    flinkShadowJar.exclude group: 'org.slf4j'
    flinkShadowJar.exclude group: 'log4j'
}

// declare the dependencies for your production and test code
dependencies {
    // --------------------------------------------------------------
    // Compile-time dependencies that should NOT be part of the
    // shadow jar and are provided in the lib folder of Flink
    // --------------------------------------------------------------
    compile "org.apache.flink:flink-java:${flinkVersion}"
    compile 
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"

    // --------------------------------------------------------------
    // Dependencies that should be part of the shadow jar, e.g.
    // connectors. These must be in the flinkShadowJar configuration!
    // --------------------------------------------------------------

    compile "org.apache.flink:flink-java:${flinkVersion}"
    compile 
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
    flinkShadowJar "org.apache.flink:flink-csv:${flinkVersion}"

    flinkShadowJar "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"
    flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}"
    compileOnly
"org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"
    compileOnly
"org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"
    flinkShadowJar "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}"
    flinkShadowJar
"org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}"


    compile "log4j:log4j:${log4jVersion}"
    compile "org.slf4j:slf4j-log4j12:${slf4jVersion}"

    // Add test dependencies here.
    // testCompile "junit:junit:4.12"
    testImplementation
"org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"
    testImplementation
"org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"
}

// make compileOnly dependencies available for tests:
sourceSets {
    main.compileClasspath += configurations.flinkShadowJar
    main.runtimeClasspath += configurations.flinkShadowJar

    test.compileClasspath += configurations.flinkShadowJar
    test.runtimeClasspath += configurations.flinkShadowJar

    javadoc.classpath += configurations.flinkShadowJar
}

run.classpath = sourceSets.main.runtimeClasspath

jar {
    manifest {
        attributes 'Built-By': System.getProperty('user.name'),
                'Build-Jdk': System.getProperty('java.version')
    }
}

shadowJar {
    configurations = [project.configurations.flinkShadowJar]
}




On Sun, Mar 1, 2020 at 12:50 AM godfrey he <godfre...@gmail.com> wrote:

> hi kant,
>
> > Also why do I need to convert to DataStream to print the rows of a
> table? Why not have a print method in the Table itself?
> Flink 1.10 introduces a utility class named TableUtils to convert a Table
> to List<Row>, this utility class is mainly used for demonstration or
> testing and is only applicable for *small batch jobs* and small finite *append
> only stream jobs*.  code like:
> Table table = tEnv.sqlQuery("select ...");
> List<Row> result = TableUtils.collectToList(table);
> result.....
>
> currently, we are planner to implement Table#collect[1], after
> that Table#head and Table#print may be also introduced soon.
>
> >  The program finished with the following exception:
> please make sure that the kafka version in Test class and the kafka
> version in pom dependency are same. I tested your code successfully.
>
> Bests,
> Godfrey
>
> [1] https://issues.apache.org/jira/browse/FLINK-14807
>
>
> Benchao Li <libenc...@gmail.com> 于2020年3月1日周日 下午4:44写道:
>
>> Hi kant,
>>
>> CSV format is an independent module, you need to add it as your
>> dependency.
>>
>> <dependency>
>>    <groupId>org.apache.flink</groupId>
>>    <artifactId>flink-csv</artifactId>
>>    <version>${flink.version}</version>
>> </dependency>
>>
>>
>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午3:43写道:
>>
>>> ------------------------------------------------------------
>>>  The program finished with the following exception:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: findAndCreateTableSource failed.
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>> at
>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>>> at
>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>>> Caused by: org.apache.flink.table.api.TableException:
>>> findAndCreateTableSource failed.
>>> at
>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
>>> at
>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
>>> at
>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156)
>>> at
>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)
>>> at
>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)
>>> at
>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>>> at
>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>>> at
>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
>>> at Test.main(Test.java:34)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>>> ... 8 more
>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
>>> Could not find a suitable table factory for
>>> 'org.apache.flink.table.factories.TableSourceFactory' in
>>> the classpath.
>>>
>>> Reason: Required context properties mismatch.
>>>
>>> The matching candidates:
>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>> Mismatched properties:
>>> 'connector.type' expects 'filesystem', but is 'kafka'
>>>
>>> The following properties are requested:
>>> connector.property-version=1
>>> connector.topic=test-topic1
>>> connector.type=kafka
>>> connector.version=0.11
>>> format.property-version=1
>>> format.type=csv
>>> schema.0.data-type=VARCHAR(2147483647)
>>> schema.0.name=f0
>>> update-mode=append
>>>
>>> The following factories have been considered:
>>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>> at
>>> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
>>> at
>>> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
>>> at
>>> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
>>> at
>>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
>>> at
>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
>>> ... 34 more
>>>
>>> On Sat, Feb 29, 2020 at 11:35 PM Piotr Nowojski <pi...@ververica.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> > Thanks for the pointer. Looks like the documentation says to use
>>>> tableEnv.registerTableSink however in my IDE it shows the method is
>>>> deprecated in Flink 1.10.
>>>>
>>>> It looks like not all of the documentation was updated after methods
>>>> were deprecated. However if you look at the java docs of the
>>>> `registerTableSink` method, you can find an answer [1].
>>>>
>>>> >  It sill doesnt work because it says for CSV the connector.type
>>>> should be filesystem not Kafka.
>>>>
>>>> Can you post the full stack trace? As I’m not familiar with the Table
>>>> API, maybe you Timo or Dawid know what’s going on here?
>>>>
>>>> Piotrek
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/api/TableEnvironment.html#registerTableSink-java.lang.String-org.apache.flink.table.sinks.TableSink-
>>>>
>>>> On 1 Mar 2020, at 07:50, kant kodali <kanth...@gmail.com> wrote:
>>>>
>>>> Here is my updated code after digging through the source code (not sure
>>>> if it is correct ). It sill doesnt work because it says for CSV the
>>>> connector.type should be filesystem not Kafka but documentation says it is
>>>> supported.
>>>>
>>>>
>>>> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
>>>> import org.apache.flink.runtime.state.StateBackend;
>>>> import 
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>> import org.apache.flink.table.api.DataTypes;
>>>> import org.apache.flink.table.api.EnvironmentSettings;
>>>> import org.apache.flink.table.api.Table;
>>>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>>> import org.apache.flink.table.descriptors.Csv;
>>>> import org.apache.flink.table.descriptors.Kafka;
>>>> import org.apache.flink.table.descriptors.Schema;
>>>>
>>>> public class Test {
>>>>
>>>>     public static void main(String... args) throws Exception {
>>>>         EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>>>                 .useBlinkPlanner()
>>>>                 .inStreamingMode()
>>>>                 .build();
>>>>
>>>>         StreamExecutionEnvironment streamExecutionEnvironment = 
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>         streamExecutionEnvironment.setStateBackend((StateBackend) new 
>>>> RocksDBStateBackend("file:///tmp/rocksdb"));
>>>>         StreamTableEnvironment tableEnvironment = 
>>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings);
>>>>
>>>>         tableEnvironment
>>>>             .connect(
>>>>                 new Kafka()
>>>>                     .version("0.11")
>>>>                     .topic("test-topic1")
>>>>             )
>>>>             .withFormat(new Csv())
>>>>             .withSchema(new Schema().field("f0", DataTypes.STRING()))
>>>>             .inAppendMode()
>>>>             .createTemporaryTable("kafka_source");
>>>>
>>>>         Table resultTable = tableEnvironment.sqlQuery("select * from 
>>>> kafka_source");
>>>>
>>>>         tableEnvironment
>>>>             .connect(
>>>>                 new Kafka()
>>>>                     .version("0.11")
>>>>                     .topic("test-topic2")
>>>>             )
>>>>             .withFormat(new Csv())
>>>>             .withSchema(new Schema().field("f0", DataTypes.STRING()))
>>>>             .inAppendMode()
>>>>             .createTemporaryTable("kafka_target");
>>>>
>>>>         tableEnvironment.insertInto("kafka_target", resultTable);
>>>>
>>>>         tableEnvironment.execute("Sample Job");
>>>>     }
>>>> }
>>>>
>>>>
>>>> On Sat, Feb 29, 2020 at 7:48 PM kant kodali <kanth...@gmail.com> wrote:
>>>>
>>>>> Hi Benchao,
>>>>>
>>>>> Agreed a ConsoleSink is very useful but that is not the only problem
>>>>> here. Documentation says use  tableEnv.registerTableSink all over the 
>>>>> place
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink
>>>>> however that function is deprecated. so how do I add any other Sink?
>>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sat, Feb 29, 2020 at 6:05 PM Benchao Li <libenc...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi kant,
>>>>>>
>>>>>> AFAIK, there is no "print to stdout" sink for Table API now, you can
>>>>>> implement one custom sink following this doc[1].
>>>>>>
>>>>>> IMO, an out-of-box print table sink is very useful, and I've created
>>>>>> an issue[2] to track this.
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink
>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-16354
>>>>>>
>>>>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 上午2:30写道:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Thanks for the pointer. Looks like the documentation says to use
>>>>>>> tableEnv.registerTableSink however in my IDE it shows the method is
>>>>>>> deprecated in Flink 1.10. so I am still not seeing a way to add a sink 
>>>>>>> that
>>>>>>> can print to stdout? what sink should I use to print to stdout and how 
>>>>>>> do I
>>>>>>> add it without converting into DataStream?
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski <pi...@ververica.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> You shouldn’t be using `KafkaTableSource` as it’s marked @Internal.
>>>>>>>> It’s not part of any public API.
>>>>>>>>
>>>>>>>> You don’t have to convert DataStream into Table to read from Kafka
>>>>>>>> in Table API. I guess you could, if you had used DataStream API’s
>>>>>>>> FlinkKafkaConsumer as it’s documented here [1].
>>>>>>>>
>>>>>>>> But you should be able to use Kafka Table connector directly, as it
>>>>>>>> is described in the docs [2][3].
>>>>>>>>
>>>>>>>> Piotrek
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
>>>>>>>> [2]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview
>>>>>>>> [3]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>>>>>>>>
>>>>>>>> On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com> wrote:
>>>>>>>>
>>>>>>>> Also why do I need to convert to DataStream to print the rows of a
>>>>>>>> table? Why not have a print method in the Table itself?
>>>>>>>>
>>>>>>>> On Sat, Feb 29, 2020 at 3:40 AM kant kodali <kanth...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> Do I need to use DataStream API or Table API to construct sources?
>>>>>>>>> I am just trying to read from Kafka and print it to console. And yes I
>>>>>>>>> tried it with datastreams and it works fine but I want to do it using 
>>>>>>>>> Table
>>>>>>>>> related APIs. I don't see any documentation or a sample on how to 
>>>>>>>>> create
>>>>>>>>> Kafka table source or any other source using Table Source API's so 
>>>>>>>>> after
>>>>>>>>> some digging I wrote the following code. My ultimate goal is to avoid
>>>>>>>>> Datastream API as much as possible and just use Table API & SQL but 
>>>>>>>>> somehow
>>>>>>>>> I feel the Flink framework focuses on DataStream than the SQL 
>>>>>>>>> interface. am
>>>>>>>>> I wrong? From the user perspective wouldn't it make more sense to 
>>>>>>>>> focus on
>>>>>>>>> SQL interfaces for both streaming and batch?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> import 
>>>>>>>>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
>>>>>>>>> import 
>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>>>>>> import org.apache.flink.streaming.connectors.kafka.KafkaTableSource;
>>>>>>>>> import org.apache.flink.table.api.DataTypes;
>>>>>>>>> import org.apache.flink.table.api.EnvironmentSettings;
>>>>>>>>> import org.apache.flink.table.api.Table;
>>>>>>>>> import org.apache.flink.table.api.TableSchema;
>>>>>>>>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>>>>>>>> import org.apache.flink.table.sources.TableSource;
>>>>>>>>> import org.apache.flink.types.Row;
>>>>>>>>>
>>>>>>>>> import java.io.IOException;
>>>>>>>>> import java.util.Properties;
>>>>>>>>>
>>>>>>>>> public class Test {
>>>>>>>>>
>>>>>>>>>     public class MyDeserializationSchema extends 
>>>>>>>>> AbstractDeserializationSchema<Row> {
>>>>>>>>>         @Override
>>>>>>>>>         public Row deserialize(byte[] message) throws IOException {
>>>>>>>>>             return Row.of(new String(message));
>>>>>>>>>         }
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     public static void main(String... args) throws Exception {
>>>>>>>>>         Test test = new Test();
>>>>>>>>>         EnvironmentSettings settings = 
>>>>>>>>> EnvironmentSettings.newInstance()
>>>>>>>>>                 .useBlinkPlanner()
>>>>>>>>>                 .inStreamingMode()
>>>>>>>>>                 .build();
>>>>>>>>>
>>>>>>>>>         StreamExecutionEnvironment streamExecutionEnvironment = 
>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>         StreamTableEnvironment tableEnvironment = 
>>>>>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings);
>>>>>>>>>
>>>>>>>>>         TableSource tableSource = test.getKafkaTableSource();
>>>>>>>>>         Table kafkaTable = 
>>>>>>>>> tableEnvironment.fromTableSource(tableSource);
>>>>>>>>>         tableEnvironment.createTemporaryView("kafka_source", 
>>>>>>>>> kafkaTable);
>>>>>>>>>
>>>>>>>>>         Table resultTable = tableEnvironment.sqlQuery("select * from 
>>>>>>>>> kafka_source");
>>>>>>>>>         tableEnvironment.toAppendStream(resultTable, 
>>>>>>>>> Row.class).print();
>>>>>>>>>
>>>>>>>>>         streamExecutionEnvironment.execute("Sample Job");
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     public KafkaTableSource getKafkaTableSource() {
>>>>>>>>>         TableSchema tableSchema = TableSchema.builder().field("f0", 
>>>>>>>>> DataTypes.STRING()).build();
>>>>>>>>>         Properties properties = new Properties();
>>>>>>>>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>>>>>>>>         properties.setProperty("group.id", "test");
>>>>>>>>>         return new KafkaTableSource(tableSchema, "test-topic1", 
>>>>>>>>> properties, new MyDeserializationSchema());
>>>>>>>>>     }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I get the following error
>>>>>>>>>
>>>>>>>>> The program finished with the following exception:
>>>>>>>>>
>>>>>>>>> The implementation of the FlinkKafkaConsumerBase is not
>>>>>>>>> serializable. The object probably contains or references non 
>>>>>>>>> serializable
>>>>>>>>> fields.
>>>>>>>>>
>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>>>>>>>>>
>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>>>>>>>>>
>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
>>>>>>>>>
>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584)
>>>>>>>>>
>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529)
>>>>>>>>>
>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511)
>>>>>>>>>
>>>>>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165)
>>>>>>>>>
>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82)
>>>>>>>>>
>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105)
>>>>>>>>>
>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>>>>>>>>>
>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>>>>
>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>>>>>>>>>
>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>>>>>>>>>
>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
>>>>>>>>>
>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>>>>>>>
>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>>>>
>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>>>>>>>>>
>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>>>>>>>>>
>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>>>>>>>>>
>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>>>>>>>
>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>>>>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>>>>>>> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>>>>>>>
>>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>>>>>>>> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>>>>>>>>
>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>>>>>>>>>
>>>>>>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>>>>>>>>>
>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>>>>>>>>>
>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
>>>>>>>>>
>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
>>>>>>>>> Test.main(Test.java:40)
>>>>>>>>>
>>>>>>>>> The error seems to be on the line
>>>>>>>>>
>>>>>>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print();
>>>>>>>>>
>>>>>>>>> and I am not sure why it is not able to serialize?
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Benchao Li
>>>>>> School of Electronics Engineering and Computer Science, Peking University
>>>>>> Tel:+86-15650713730
>>>>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>>>>
>>>>>>
>>>>
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>>

Reply via email to