Hi Benchao,

That worked! Pasting the build.gradle file here. However this only works
for 0.11 and it needs zookeeper.connect() which shouldn't be required. not
sure why it is required in Flink Kafka connector?  If I change the version
to 2.2 in the code and specify this jar

flinkShadowJar "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}"

or

flinkShadowJar 
"org.apache.flink:flink-sql-connector-kafka-0.11_2.11:${flinkVersion}"
//Not sure if I should use this one for Kafka >= 0.11

It doesn't work either.


buildscript {
    repositories {
        jcenter() // this applies only to the Gradle 'Shadow' plugin
    }
    dependencies {
        classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
    }
}

plugins {
    id 'java'
    id 'application'
}

mainClassName = 'Test'
apply plugin: 'com.github.johnrengelman.shadow'

ext {
    javaVersion = '1.8'
    flinkVersion = '1.10.0'
    scalaBinaryVersion = '2.11'
    slf4jVersion = '1.7.7'
    log4jVersion = '1.2.17'
}


sourceCompatibility = javaVersion
targetCompatibility = javaVersion
tasks.withType(JavaCompile) {
    options.encoding = 'UTF-8'
}

applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"]

// declare where to find the dependencies of your project
repositories {
    mavenCentral()
    maven { url
"https://repository.apache.org/content/repositories/snapshots/"; }
}

// NOTE: We cannot use "compileOnly" or "shadow" configurations since
then we could not run code
// in the IDE or with "gradle run". We also cannot exclude transitive
dependencies from the
// shadowJar yet (see https://github.com/johnrengelman/shadow/issues/159).
// -> Explicitly define the // libraries we want to be included in the
"flinkShadowJar" configuration!
configurations {
    flinkShadowJar // dependencies which go into the shadowJar

    // always exclude these (also from transitive dependencies) since
they are provided by Flink
    flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading'
    flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305'
    flinkShadowJar.exclude group: 'org.slf4j'
    flinkShadowJar.exclude group: 'log4j'
}

// declare the dependencies for your production and test code
dependencies {
    // --------------------------------------------------------------
    // Compile-time dependencies that should NOT be part of the
    // shadow jar and are provided in the lib folder of Flink
    // --------------------------------------------------------------
    compile "org.apache.flink:flink-java:${flinkVersion}"
    compile 
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"

    // --------------------------------------------------------------
    // Dependencies that should be part of the shadow jar, e.g.
    // connectors. These must be in the flinkShadowJar configuration!
    // --------------------------------------------------------------

    compile "org.apache.flink:flink-java:${flinkVersion}"
    compile 
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
    flinkShadowJar "org.apache.flink:flink-csv:${flinkVersion}"

    // tried doesnt work. same problem
    //flinkShadowJar
"org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"

    // tried doesnt work. same problem
    //flinkShadowJar
"org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}"

    //tried doesnt work. same problem
    flinkShadowJar
"org.apache.flink:flink-sql-connector-kafka-0.11_2.11:${flinkVersion}"

    flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}"
    compileOnly
"org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"
    compileOnly
"org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"
    flinkShadowJar "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}"
    flinkShadowJar
"org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}"


    compile "log4j:log4j:${log4jVersion}"
    compile "org.slf4j:slf4j-log4j12:${slf4jVersion}"

    // Add test dependencies here.
    // testCompile "junit:junit:4.12"
    testImplementation
"org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"
    testImplementation
"org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"
}

// make compileOnly dependencies available for tests:
sourceSets {
    main.compileClasspath += configurations.flinkShadowJar
    main.runtimeClasspath += configurations.flinkShadowJar

    test.compileClasspath += configurations.flinkShadowJar
    test.runtimeClasspath += configurations.flinkShadowJar

    javadoc.classpath += configurations.flinkShadowJar
}

run.classpath = sourceSets.main.runtimeClasspath

jar {
    manifest {
        attributes 'Built-By': System.getProperty('user.name'),
                'Build-Jdk': System.getProperty('java.version')
    }
}

shadowJar {
    configurations = [project.configurations.flinkShadowJar]
    mergeServiceFiles()
    manifest {
        attributes 'Main-Class': mainClassName
    }
}






On Sun, Mar 1, 2020 at 1:38 AM Benchao Li <libenc...@gmail.com> wrote:

> I don't know how gradle works, but in Maven, packaging dependencies into
> one fat jar needs to specify how SPI property files should be dealt with,
> like
>
> <transformers>
>     <transformer 
> implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
> </transformers>
>
> Could you check that your final jar contains correct resource file?
>
> godfrey he <godfre...@gmail.com> 于2020年3月1日周日 下午5:25写道:
>
>> I think you should use `flink-sql-connector-kafka*-0.11*_2.11` instead
>> of `flink-connector-kafka_2.11`.
>>
>> Bests,
>> Godfrey
>>
>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午5:15写道:
>>
>>> The dependency was already there. Below is my build.gradle. Also I
>>> checked the kafka version and looks like the jar
>>>
>>> flinkShadowJar "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"
>>>
>>> downloads kafka-clients version 2.2.0. So I changed my code to version
>>> 2.2.0 and same problem persists.
>>>
>>> buildscript {
>>>     repositories {
>>>         jcenter() // this applies only to the Gradle 'Shadow' plugin
>>>     }
>>>     dependencies {
>>>         classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
>>>     }
>>> }
>>>
>>> plugins {
>>>     id 'java'
>>>     id 'application'
>>> }
>>>
>>> mainClassName = 'Test'
>>> apply plugin: 'com.github.johnrengelman.shadow'
>>>
>>> ext {
>>>     javaVersion = '1.8'
>>>     flinkVersion = '1.10.0'
>>>     scalaBinaryVersion = '2.11'
>>>     slf4jVersion = '1.7.7'
>>>     log4jVersion = '1.2.17'
>>> }
>>>
>>>
>>> sourceCompatibility = javaVersion
>>> targetCompatibility = javaVersion
>>> tasks.withType(JavaCompile) {
>>>     options.encoding = 'UTF-8'
>>> }
>>>
>>> applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"]
>>>
>>> // declare where to find the dependencies of your project
>>> repositories {
>>>     mavenCentral()
>>>     maven { url 
>>> "https://repository.apache.org/content/repositories/snapshots/"; }
>>> }
>>>
>>> // NOTE: We cannot use "compileOnly" or "shadow" configurations since then 
>>> we could not run code
>>> // in the IDE or with "gradle run". We also cannot exclude transitive 
>>> dependencies from the
>>> // shadowJar yet (see https://github.com/johnrengelman/shadow/issues/159).
>>> // -> Explicitly define the // libraries we want to be included in the 
>>> "flinkShadowJar" configuration!
>>> configurations {
>>>     flinkShadowJar // dependencies which go into the shadowJar
>>>
>>>     // always exclude these (also from transitive dependencies) since they 
>>> are provided by Flink
>>>     flinkShadowJar.exclude group: 'org.apache.flink', module: 
>>> 'force-shading'
>>>     flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 
>>> 'jsr305'
>>>     flinkShadowJar.exclude group: 'org.slf4j'
>>>     flinkShadowJar.exclude group: 'log4j'
>>> }
>>>
>>> // declare the dependencies for your production and test code
>>> dependencies {
>>>     // --------------------------------------------------------------
>>>     // Compile-time dependencies that should NOT be part of the
>>>     // shadow jar and are provided in the lib folder of Flink
>>>     // --------------------------------------------------------------
>>>     compile "org.apache.flink:flink-java:${flinkVersion}"
>>>     compile 
>>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>>>
>>>     // --------------------------------------------------------------
>>>     // Dependencies that should be part of the shadow jar, e.g.
>>>     // connectors. These must be in the flinkShadowJar configuration!
>>>     // --------------------------------------------------------------
>>>
>>>     compile "org.apache.flink:flink-java:${flinkVersion}"
>>>     compile 
>>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>>>     flinkShadowJar "org.apache.flink:flink-csv:${flinkVersion}"
>>>
>>>     flinkShadowJar 
>>> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"
>>>     flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}"
>>>     compileOnly 
>>> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"
>>>     compileOnly 
>>> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"
>>>     flinkShadowJar 
>>> "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}"
>>>     flinkShadowJar 
>>> "org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}"
>>>
>>>
>>>     compile "log4j:log4j:${log4jVersion}"
>>>     compile "org.slf4j:slf4j-log4j12:${slf4jVersion}"
>>>
>>>     // Add test dependencies here.
>>>     // testCompile "junit:junit:4.12"
>>>     testImplementation 
>>> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"
>>>     testImplementation 
>>> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"
>>> }
>>>
>>> // make compileOnly dependencies available for tests:
>>> sourceSets {
>>>     main.compileClasspath += configurations.flinkShadowJar
>>>     main.runtimeClasspath += configurations.flinkShadowJar
>>>
>>>     test.compileClasspath += configurations.flinkShadowJar
>>>     test.runtimeClasspath += configurations.flinkShadowJar
>>>
>>>     javadoc.classpath += configurations.flinkShadowJar
>>> }
>>>
>>> run.classpath = sourceSets.main.runtimeClasspath
>>>
>>> jar {
>>>     manifest {
>>>         attributes 'Built-By': System.getProperty('user.name'),
>>>                 'Build-Jdk': System.getProperty('java.version')
>>>     }
>>> }
>>>
>>> shadowJar {
>>>     configurations = [project.configurations.flinkShadowJar]
>>> }
>>>
>>>
>>>
>>>
>>> On Sun, Mar 1, 2020 at 12:50 AM godfrey he <godfre...@gmail.com> wrote:
>>>
>>>> hi kant,
>>>>
>>>> > Also why do I need to convert to DataStream to print the rows of a
>>>> table? Why not have a print method in the Table itself?
>>>> Flink 1.10 introduces a utility class named TableUtils to convert a
>>>> Table to List<Row>, this utility class is mainly used for demonstration or
>>>> testing and is only applicable for *small batch jobs* and small finite 
>>>> *append
>>>> only stream jobs*.  code like:
>>>> Table table = tEnv.sqlQuery("select ...");
>>>> List<Row> result = TableUtils.collectToList(table);
>>>> result.....
>>>>
>>>> currently, we are planner to implement Table#collect[1], after
>>>> that Table#head and Table#print may be also introduced soon.
>>>>
>>>> >  The program finished with the following exception:
>>>> please make sure that the kafka version in Test class and the kafka
>>>> version in pom dependency are same. I tested your code successfully.
>>>>
>>>> Bests,
>>>> Godfrey
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-14807
>>>>
>>>>
>>>> Benchao Li <libenc...@gmail.com> 于2020年3月1日周日 下午4:44写道:
>>>>
>>>>> Hi kant,
>>>>>
>>>>> CSV format is an independent module, you need to add it as your
>>>>> dependency.
>>>>>
>>>>> <dependency>
>>>>>    <groupId>org.apache.flink</groupId>
>>>>>    <artifactId>flink-csv</artifactId>
>>>>>    <version>${flink.version}</version>
>>>>> </dependency>
>>>>>
>>>>>
>>>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午3:43写道:
>>>>>
>>>>>> ------------------------------------------------------------
>>>>>>  The program finished with the following exception:
>>>>>>
>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>>> method caused an error: findAndCreateTableSource failed.
>>>>>> at
>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>>>>>> at
>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>>>>> at
>>>>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>>>>>> at
>>>>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>>>>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>>>>> at
>>>>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>>>>>> at
>>>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>>>>>> at
>>>>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>>>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>>>>>> Caused by: org.apache.flink.table.api.TableException:
>>>>>> findAndCreateTableSource failed.
>>>>>> at
>>>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
>>>>>> at
>>>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
>>>>>> at
>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156)
>>>>>> at
>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)
>>>>>> at
>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)
>>>>>> at
>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)
>>>>>> at
>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>>>>>> at
>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>>>>>> at
>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>>>>>> at
>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>>>>>> at
>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>>>>>> at
>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>>>>>> at
>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>>>>>> at
>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>>>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>>>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>>>>>> at
>>>>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>>>>>> at
>>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>>>>>> at
>>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>>>>>> at
>>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>>>>>> at
>>>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>>>>>> at
>>>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
>>>>>> at Test.main(Test.java:34)
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>> at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>> at
>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>>>>>> ... 8 more
>>>>>> Caused by:
>>>>>> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not 
>>>>>> find
>>>>>> a suitable table factory for
>>>>>> 'org.apache.flink.table.factories.TableSourceFactory' in
>>>>>> the classpath.
>>>>>>
>>>>>> Reason: Required context properties mismatch.
>>>>>>
>>>>>> The matching candidates:
>>>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>>>>> Mismatched properties:
>>>>>> 'connector.type' expects 'filesystem', but is 'kafka'
>>>>>>
>>>>>> The following properties are requested:
>>>>>> connector.property-version=1
>>>>>> connector.topic=test-topic1
>>>>>> connector.type=kafka
>>>>>> connector.version=0.11
>>>>>> format.property-version=1
>>>>>> format.type=csv
>>>>>> schema.0.data-type=VARCHAR(2147483647)
>>>>>> schema.0.name=f0
>>>>>> update-mode=append
>>>>>>
>>>>>> The following factories have been considered:
>>>>>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>>>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>>>>> at
>>>>>> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
>>>>>> at
>>>>>> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
>>>>>> at
>>>>>> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
>>>>>> at
>>>>>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
>>>>>> at
>>>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
>>>>>> ... 34 more
>>>>>>
>>>>>> On Sat, Feb 29, 2020 at 11:35 PM Piotr Nowojski <pi...@ververica.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> > Thanks for the pointer. Looks like the documentation says to use
>>>>>>> tableEnv.registerTableSink however in my IDE it shows the method is
>>>>>>> deprecated in Flink 1.10.
>>>>>>>
>>>>>>> It looks like not all of the documentation was updated after methods
>>>>>>> were deprecated. However if you look at the java docs of the
>>>>>>> `registerTableSink` method, you can find an answer [1].
>>>>>>>
>>>>>>> >  It sill doesnt work because it says for CSV the connector.type
>>>>>>> should be filesystem not Kafka.
>>>>>>>
>>>>>>> Can you post the full stack trace? As I’m not familiar with the
>>>>>>> Table API, maybe you Timo or Dawid know what’s going on here?
>>>>>>>
>>>>>>> Piotrek
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/api/TableEnvironment.html#registerTableSink-java.lang.String-org.apache.flink.table.sinks.TableSink-
>>>>>>>
>>>>>>> On 1 Mar 2020, at 07:50, kant kodali <kanth...@gmail.com> wrote:
>>>>>>>
>>>>>>> Here is my updated code after digging through the source code (not
>>>>>>> sure if it is correct ). It sill doesnt work because it says for CSV the
>>>>>>> connector.type should be filesystem not Kafka but documentation says it 
>>>>>>> is
>>>>>>> supported.
>>>>>>>
>>>>>>>
>>>>>>> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
>>>>>>> import org.apache.flink.runtime.state.StateBackend;
>>>>>>> import 
>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>>>> import org.apache.flink.table.api.DataTypes;
>>>>>>> import org.apache.flink.table.api.EnvironmentSettings;
>>>>>>> import org.apache.flink.table.api.Table;
>>>>>>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>>>>>> import org.apache.flink.table.descriptors.Csv;
>>>>>>> import org.apache.flink.table.descriptors.Kafka;
>>>>>>> import org.apache.flink.table.descriptors.Schema;
>>>>>>>
>>>>>>> public class Test {
>>>>>>>
>>>>>>>     public static void main(String... args) throws Exception {
>>>>>>>         EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>>>>>>                 .useBlinkPlanner()
>>>>>>>                 .inStreamingMode()
>>>>>>>                 .build();
>>>>>>>
>>>>>>>         StreamExecutionEnvironment streamExecutionEnvironment = 
>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>         streamExecutionEnvironment.setStateBackend((StateBackend) new 
>>>>>>> RocksDBStateBackend("file:///tmp/rocksdb"));
>>>>>>>         StreamTableEnvironment tableEnvironment = 
>>>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings);
>>>>>>>
>>>>>>>         tableEnvironment
>>>>>>>             .connect(
>>>>>>>                 new Kafka()
>>>>>>>                     .version("0.11")
>>>>>>>                     .topic("test-topic1")
>>>>>>>             )
>>>>>>>             .withFormat(new Csv())
>>>>>>>             .withSchema(new Schema().field("f0", DataTypes.STRING()))
>>>>>>>             .inAppendMode()
>>>>>>>             .createTemporaryTable("kafka_source");
>>>>>>>
>>>>>>>         Table resultTable = tableEnvironment.sqlQuery("select * from 
>>>>>>> kafka_source");
>>>>>>>
>>>>>>>         tableEnvironment
>>>>>>>             .connect(
>>>>>>>                 new Kafka()
>>>>>>>                     .version("0.11")
>>>>>>>                     .topic("test-topic2")
>>>>>>>             )
>>>>>>>             .withFormat(new Csv())
>>>>>>>             .withSchema(new Schema().field("f0", DataTypes.STRING()))
>>>>>>>             .inAppendMode()
>>>>>>>             .createTemporaryTable("kafka_target");
>>>>>>>
>>>>>>>         tableEnvironment.insertInto("kafka_target", resultTable);
>>>>>>>
>>>>>>>         tableEnvironment.execute("Sample Job");
>>>>>>>     }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Feb 29, 2020 at 7:48 PM kant kodali <kanth...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Benchao,
>>>>>>>>
>>>>>>>> Agreed a ConsoleSink is very useful but that is not the only
>>>>>>>> problem here. Documentation says use  tableEnv.registerTableSink all 
>>>>>>>> over
>>>>>>>> the place
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink
>>>>>>>> however that function is deprecated. so how do I add any other Sink?
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Feb 29, 2020 at 6:05 PM Benchao Li <libenc...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi kant,
>>>>>>>>>
>>>>>>>>> AFAIK, there is no "print to stdout" sink for Table API now, you
>>>>>>>>> can implement one custom sink following this doc[1].
>>>>>>>>>
>>>>>>>>> IMO, an out-of-box print table sink is very useful, and I've
>>>>>>>>> created an issue[2] to track this.
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink
>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-16354
>>>>>>>>>
>>>>>>>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 上午2:30写道:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Thanks for the pointer. Looks like the documentation says to use
>>>>>>>>>> tableEnv.registerTableSink however in my IDE it shows the method is
>>>>>>>>>> deprecated in Flink 1.10. so I am still not seeing a way to add a 
>>>>>>>>>> sink that
>>>>>>>>>> can print to stdout? what sink should I use to print to stdout and 
>>>>>>>>>> how do I
>>>>>>>>>> add it without converting into DataStream?
>>>>>>>>>>
>>>>>>>>>> Thanks!
>>>>>>>>>>
>>>>>>>>>> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski <
>>>>>>>>>> pi...@ververica.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> You shouldn’t be using `KafkaTableSource` as it’s marked
>>>>>>>>>>> @Internal. It’s not part of any public API.
>>>>>>>>>>>
>>>>>>>>>>> You don’t have to convert DataStream into Table to read from
>>>>>>>>>>> Kafka in Table API. I guess you could, if you had used DataStream 
>>>>>>>>>>> API’s
>>>>>>>>>>> FlinkKafkaConsumer as it’s documented here [1].
>>>>>>>>>>>
>>>>>>>>>>> But you should be able to use Kafka Table connector directly, as
>>>>>>>>>>> it is described in the docs [2][3].
>>>>>>>>>>>
>>>>>>>>>>> Piotrek
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
>>>>>>>>>>> [2]
>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview
>>>>>>>>>>> [3]
>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>>>>>>>>>>>
>>>>>>>>>>> On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Also why do I need to convert to DataStream to print the rows of
>>>>>>>>>>> a table? Why not have a print method in the Table itself?
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Feb 29, 2020 at 3:40 AM kant kodali <kanth...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>
>>>>>>>>>>>> Do I need to use DataStream API or Table API to construct
>>>>>>>>>>>> sources? I am just trying to read from Kafka and print it to 
>>>>>>>>>>>> console. And
>>>>>>>>>>>> yes I tried it with datastreams and it works fine but I want to do 
>>>>>>>>>>>> it using
>>>>>>>>>>>> Table related APIs. I don't see any documentation or a sample on 
>>>>>>>>>>>> how to
>>>>>>>>>>>> create Kafka table source or any other source using Table Source 
>>>>>>>>>>>> API's so
>>>>>>>>>>>> after some digging I wrote the following code. My ultimate goal is 
>>>>>>>>>>>> to avoid
>>>>>>>>>>>> Datastream API as much as possible and just use Table API & SQL 
>>>>>>>>>>>> but somehow
>>>>>>>>>>>> I feel the Flink framework focuses on DataStream than the SQL 
>>>>>>>>>>>> interface. am
>>>>>>>>>>>> I wrong? From the user perspective wouldn't it make more sense to 
>>>>>>>>>>>> focus on
>>>>>>>>>>>> SQL interfaces for both streaming and batch?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> import 
>>>>>>>>>>>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
>>>>>>>>>>>> import 
>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>>>>>>>>> import 
>>>>>>>>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSource;
>>>>>>>>>>>> import org.apache.flink.table.api.DataTypes;
>>>>>>>>>>>> import org.apache.flink.table.api.EnvironmentSettings;
>>>>>>>>>>>> import org.apache.flink.table.api.Table;
>>>>>>>>>>>> import org.apache.flink.table.api.TableSchema;
>>>>>>>>>>>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>>>>>>>>>>> import org.apache.flink.table.sources.TableSource;
>>>>>>>>>>>> import org.apache.flink.types.Row;
>>>>>>>>>>>>
>>>>>>>>>>>> import java.io.IOException;
>>>>>>>>>>>> import java.util.Properties;
>>>>>>>>>>>>
>>>>>>>>>>>> public class Test {
>>>>>>>>>>>>
>>>>>>>>>>>>     public class MyDeserializationSchema extends 
>>>>>>>>>>>> AbstractDeserializationSchema<Row> {
>>>>>>>>>>>>         @Override
>>>>>>>>>>>>         public Row deserialize(byte[] message) throws IOException {
>>>>>>>>>>>>             return Row.of(new String(message));
>>>>>>>>>>>>         }
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>     public static void main(String... args) throws Exception {
>>>>>>>>>>>>         Test test = new Test();
>>>>>>>>>>>>         EnvironmentSettings settings = 
>>>>>>>>>>>> EnvironmentSettings.newInstance()
>>>>>>>>>>>>                 .useBlinkPlanner()
>>>>>>>>>>>>                 .inStreamingMode()
>>>>>>>>>>>>                 .build();
>>>>>>>>>>>>
>>>>>>>>>>>>         StreamExecutionEnvironment streamExecutionEnvironment = 
>>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>>>         StreamTableEnvironment tableEnvironment = 
>>>>>>>>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, 
>>>>>>>>>>>> settings);
>>>>>>>>>>>>
>>>>>>>>>>>>         TableSource tableSource = test.getKafkaTableSource();
>>>>>>>>>>>>         Table kafkaTable = 
>>>>>>>>>>>> tableEnvironment.fromTableSource(tableSource);
>>>>>>>>>>>>         tableEnvironment.createTemporaryView("kafka_source", 
>>>>>>>>>>>> kafkaTable);
>>>>>>>>>>>>
>>>>>>>>>>>>         Table resultTable = tableEnvironment.sqlQuery("select * 
>>>>>>>>>>>> from kafka_source");
>>>>>>>>>>>>         tableEnvironment.toAppendStream(resultTable, 
>>>>>>>>>>>> Row.class).print();
>>>>>>>>>>>>
>>>>>>>>>>>>         streamExecutionEnvironment.execute("Sample Job");
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>     public KafkaTableSource getKafkaTableSource() {
>>>>>>>>>>>>         TableSchema tableSchema = 
>>>>>>>>>>>> TableSchema.builder().field("f0", DataTypes.STRING()).build();
>>>>>>>>>>>>         Properties properties = new Properties();
>>>>>>>>>>>>         properties.setProperty("bootstrap.servers", 
>>>>>>>>>>>> "localhost:9092");
>>>>>>>>>>>>         properties.setProperty("group.id", "test");
>>>>>>>>>>>>         return new KafkaTableSource(tableSchema, "test-topic1", 
>>>>>>>>>>>> properties, new MyDeserializationSchema());
>>>>>>>>>>>>     }
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I get the following error
>>>>>>>>>>>>
>>>>>>>>>>>> The program finished with the following exception:
>>>>>>>>>>>>
>>>>>>>>>>>> The implementation of the FlinkKafkaConsumerBase is not
>>>>>>>>>>>> serializable. The object probably contains or references non 
>>>>>>>>>>>> serializable
>>>>>>>>>>>> fields.
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>>>>>>>>>>>>
>>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>>>>>>>>>>
>>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>>>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>>>>>>>>>>
>>>>>>>>>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>>>>>>>>>> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>>>>>>>>>>
>>>>>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>>>>>>>>>>> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
>>>>>>>>>>>> Test.main(Test.java:40)
>>>>>>>>>>>>
>>>>>>>>>>>> The error seems to be on the line
>>>>>>>>>>>>
>>>>>>>>>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print();
>>>>>>>>>>>>
>>>>>>>>>>>> and I am not sure why it is not able to serialize?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> Benchao Li
>>>>>>>>> School of Electronics Engineering and Computer Science, Peking 
>>>>>>>>> University
>>>>>>>>> Tel:+86-15650713730
>>>>>>>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Benchao Li
>>>>> School of Electronics Engineering and Computer Science, Peking University
>>>>> Tel:+86-15650713730
>>>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>>>
>>>>>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>

Reply via email to