Hi Kant, I think Dawid meant to not add the Kafka version number like this:
flinkShadowJar "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}" On Sun, Mar 1, 2020 at 7:31 PM kant kodali <kanth...@gmail.com> wrote: > * What went wrong: > Could not determine the dependencies of task ':shadowJar'. > > Could not resolve all dependencies for configuration ':flinkShadowJar'. > > Could not find > org.apache.flink:flink-sql-connector-kafka_2.11:universal. > Searched in the following locations: > - > https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.pom > - > https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.jar > - > https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.pom > - > https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.jar > Required by: > project : > > > > On Sun, Mar 1, 2020 at 6:43 AM Dawid Wysakowicz <dwysakow...@apache.org> > wrote: > >> Hi Kant, >> >> If you want to use the *universal *kafka connector you use "universal" >> for the version. The community decided to no longer distinguish different >> kafka connector versions, but to use the newest kafka client version for >> all versions of kafka 1.0+. So if you want to use the connector from >> flink-sql-connector-kafka_2.11 use "universal" for the version. >> >> As for the collect/print sink. We do realize importance of the sink and >> there were a few approaches to implement one. Including the TableUtils >> mentioned by godfrey. It does not have strong consistency guarantees and is >> recommended rather only for experiments/testing. There is also an ongoing >> discussion how to implement such a sink for *both *batch and streaming >> here: >> https://issues.apache.org/jira/browse/FLINK-14807?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=17046455#comment-17046455 >> >> Best, >> >> Dawid >> On 01/03/2020 12:00, kant kodali wrote: >> >> Hi Benchao, >> >> That worked! Pasting the build.gradle file here. However this only works >> for 0.11 and it needs zookeeper.connect() which shouldn't be required. not >> sure why it is required in Flink Kafka connector? If I change the version >> to 2.2 in the code and specify this jar >> >> flinkShadowJar >> "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}" >> >> or >> >> flinkShadowJar >> "org.apache.flink:flink-sql-connector-kafka-0.11_2.11:${flinkVersion}" //Not >> sure if I should use this one for Kafka >= 0.11 >> >> It doesn't work either. >> >> >> buildscript { repositories { jcenter() // this applies only to the >> Gradle 'Shadow' plugin } dependencies { classpath >> 'com.github.jengelman.gradle.plugins:shadow:2.0.4' }}plugins { id >> 'java' id 'application'}mainClassName = 'Test'apply plugin: >> 'com.github.johnrengelman.shadow'ext { javaVersion = '1.8' >> flinkVersion = '1.10.0' scalaBinaryVersion = '2.11' slf4jVersion = >> '1.7.7' log4jVersion = '1.2.17'}sourceCompatibility = >> javaVersiontargetCompatibility = javaVersiontasks.withType(JavaCompile) { >> options.encoding = 'UTF-8'}applicationDefaultJvmArgs = >> ["-Dlog4j.configuration=log4j.properties"] >> // declare where to find the dependencies of your projectrepositories { >> mavenCentral() >> maven { url >> "https://repository.apache.org/content/repositories/snapshots/" }}// NOTE: >> We cannot use "compileOnly" or "shadow" configurations since then we could >> not run code// in the IDE or with "gradle run". We also cannot exclude >> transitive dependencies from the// shadowJar yet (see >> https://github.com/johnrengelman/shadow/issues/159).// -> Explicitly define >> the // libraries we want to be included in the "flinkShadowJar" >> configuration!configurations { flinkShadowJar // dependencies which go >> into the shadowJar // always exclude these (also from transitive >> dependencies) since they are provided by Flink flinkShadowJar.exclude >> group: 'org.apache.flink', module: 'force-shading' flinkShadowJar.exclude >> group: 'com.google.code.findbugs', module: 'jsr305' >> flinkShadowJar.exclude group: 'org.slf4j' flinkShadowJar.exclude group: >> 'log4j'}// declare the dependencies for your production and test >> codedependencies { // >> -------------------------------------------------------------- // >> Compile-time dependencies that should NOT be part of the // shadow jar >> and are provided in the lib folder of Flink // >> -------------------------------------------------------------- compile >> "org.apache.flink:flink-java:${flinkVersion}" compile >> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" >> // -------------------------------------------------------------- // >> Dependencies that should be part of the shadow jar, e.g. // connectors. >> These must be in the flinkShadowJar configuration! // >> -------------------------------------------------------------- compile >> "org.apache.flink:flink-java:${flinkVersion}" compile >> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" >> flinkShadowJar "org.apache.flink:flink-csv:${flinkVersion}" // tried >> doesnt work. same problem //flinkShadowJar >> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}" // tried >> doesnt work. same problem //flinkShadowJar >> "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}" //tried >> doesnt work. same problem flinkShadowJar >> "org.apache.flink:flink-sql-connector-kafka-0.11_2.11:${flinkVersion}" >> flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}" >> compileOnly >> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}" >> compileOnly >> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}" >> flinkShadowJar "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}" >> flinkShadowJar >> "org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}" >> compile "log4j:log4j:${log4jVersion}" compile >> "org.slf4j:slf4j-log4j12:${slf4jVersion}" // Add test dependencies here. >> // testCompile "junit:junit:4.12" testImplementation >> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}" >> testImplementation >> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"}// make >> compileOnly dependencies available for tests:sourceSets { >> main.compileClasspath += configurations.flinkShadowJar >> main.runtimeClasspath += configurations.flinkShadowJar >> >> test.compileClasspath += configurations.flinkShadowJar >> test.runtimeClasspath += configurations.flinkShadowJar >> >> javadoc.classpath += configurations.flinkShadowJar}run.classpath = >> sourceSets.main.runtimeClasspathjar { manifest { attributes >> 'Built-By': System.getProperty('user.name'), >> 'Build-Jdk': System.getProperty('java.version') >> }}shadowJar { configurations = [project.configurations.flinkShadowJar] >> mergeServiceFiles() >> manifest { attributes 'Main-Class': mainClassName }} >> >> >> >> >> >> >> On Sun, Mar 1, 2020 at 1:38 AM Benchao Li <libenc...@gmail.com> wrote: >> >>> I don't know how gradle works, but in Maven, packaging dependencies into >>> one fat jar needs to specify how SPI property files should be dealt with, >>> like >>> >>> <transformers> <transformer >>> implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers> >>> >>> Could you check that your final jar contains correct resource file? >>> >>> godfrey he <godfre...@gmail.com> 于2020年3月1日周日 下午5:25写道: >>> >>>> I think you should use `flink-sql-connector-kafka*-0.11*_2.11` instead >>>> of `flink-connector-kafka_2.11`. >>>> >>>> Bests, >>>> Godfrey >>>> >>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午5:15写道: >>>> >>>>> The dependency was already there. Below is my build.gradle. Also I >>>>> checked the kafka version and looks like the jar >>>>> >>>>> flinkShadowJar >>>>> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}" >>>>> >>>>> downloads kafka-clients version 2.2.0. So I changed my code to version >>>>> 2.2.0 and same problem persists. >>>>> >>>>> buildscript { repositories { jcenter() // this applies only to >>>>> the Gradle 'Shadow' plugin } dependencies { classpath >>>>> 'com.github.jengelman.gradle.plugins:shadow:2.0.4' }}plugins { id >>>>> 'java' id 'application'}mainClassName = 'Test'apply plugin: >>>>> 'com.github.johnrengelman.shadow'ext { javaVersion = '1.8' >>>>> flinkVersion = '1.10.0' scalaBinaryVersion = '2.11' slf4jVersion = >>>>> '1.7.7' log4jVersion = '1.2.17'}sourceCompatibility = >>>>> javaVersiontargetCompatibility = javaVersiontasks.withType(JavaCompile) { >>>>> options.encoding = 'UTF-8'}applicationDefaultJvmArgs = >>>>> ["-Dlog4j.configuration=log4j.properties"] >>>>> // declare where to find the dependencies of your projectrepositories { >>>>> mavenCentral() >>>>> maven { url >>>>> "https://repository.apache.org/content/repositories/snapshots/" }}// >>>>> NOTE: We cannot use "compileOnly" or "shadow" configurations since then >>>>> we could not run code// in the IDE or with "gradle run". We also cannot >>>>> exclude transitive dependencies from the// shadowJar yet (see >>>>> https://github.com/johnrengelman/shadow/issues/159).// -> Explicitly >>>>> define the // libraries we want to be included in the "flinkShadowJar" >>>>> configuration!configurations { flinkShadowJar // dependencies which go >>>>> into the shadowJar // always exclude these (also from transitive >>>>> dependencies) since they are provided by Flink flinkShadowJar.exclude >>>>> group: 'org.apache.flink', module: 'force-shading' >>>>> flinkShadowJar.exclude group: 'com.google.code.findbugs', module: >>>>> 'jsr305' flinkShadowJar.exclude group: 'org.slf4j' >>>>> flinkShadowJar.exclude group: 'log4j'}// declare the dependencies for >>>>> your production and test codedependencies { // >>>>> -------------------------------------------------------------- // >>>>> Compile-time dependencies that should NOT be part of the // shadow jar >>>>> and are provided in the lib folder of Flink // >>>>> -------------------------------------------------------------- compile >>>>> "org.apache.flink:flink-java:${flinkVersion}" compile >>>>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" >>>>> // -------------------------------------------------------------- >>>>> // Dependencies that should be part of the shadow jar, e.g. // >>>>> connectors. These must be in the flinkShadowJar configuration! // >>>>> -------------------------------------------------------------- compile >>>>> "org.apache.flink:flink-java:${flinkVersion}" compile >>>>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" >>>>> flinkShadowJar "org.apache.flink:flink-csv:${flinkVersion}" >>>>> flinkShadowJar >>>>> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}" >>>>> flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}" >>>>> compileOnly >>>>> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}" >>>>> compileOnly >>>>> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}" >>>>> flinkShadowJar >>>>> "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}" >>>>> flinkShadowJar >>>>> "org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}" >>>>> compile "log4j:log4j:${log4jVersion}" compile >>>>> "org.slf4j:slf4j-log4j12:${slf4jVersion}" // Add test dependencies >>>>> here. // testCompile "junit:junit:4.12" testImplementation >>>>> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}" >>>>> testImplementation >>>>> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"}// make >>>>> compileOnly dependencies available for tests:sourceSets { >>>>> main.compileClasspath += configurations.flinkShadowJar >>>>> main.runtimeClasspath += configurations.flinkShadowJar >>>>> >>>>> test.compileClasspath += configurations.flinkShadowJar >>>>> test.runtimeClasspath += configurations.flinkShadowJar >>>>> >>>>> javadoc.classpath += configurations.flinkShadowJar}run.classpath = >>>>> sourceSets.main.runtimeClasspathjar { manifest { attributes >>>>> 'Built-By': System.getProperty('user.name'), >>>>> 'Build-Jdk': System.getProperty('java.version') >>>>> }}shadowJar { configurations = >>>>> [project.configurations.flinkShadowJar]} >>>>> >>>>> >>>>> >>>>> >>>>> On Sun, Mar 1, 2020 at 12:50 AM godfrey he <godfre...@gmail.com> >>>>> wrote: >>>>> >>>>>> hi kant, >>>>>> >>>>>> > Also why do I need to convert to DataStream to print the rows of a >>>>>> table? Why not have a print method in the Table itself? >>>>>> Flink 1.10 introduces a utility class named TableUtils to convert a >>>>>> Table to List<Row>, this utility class is mainly used for demonstration >>>>>> or >>>>>> testing and is only applicable for *small batch jobs* and small >>>>>> finite *append only stream jobs*. code like: >>>>>> Table table = tEnv.sqlQuery("select ..."); >>>>>> List<Row> result = TableUtils.collectToList(table); >>>>>> result..... >>>>>> >>>>>> currently, we are planner to implement Table#collect[1], after >>>>>> that Table#head and Table#print may be also introduced soon. >>>>>> >>>>>> > The program finished with the following exception: >>>>>> please make sure that the kafka version in Test class and the kafka >>>>>> version in pom dependency are same. I tested your code successfully. >>>>>> >>>>>> Bests, >>>>>> Godfrey >>>>>> >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-14807 >>>>>> >>>>>> >>>>>> Benchao Li <libenc...@gmail.com> 于2020年3月1日周日 下午4:44写道: >>>>>> >>>>>>> Hi kant, >>>>>>> >>>>>>> CSV format is an independent module, you need to add it as your >>>>>>> dependency. >>>>>>> >>>>>>> <dependency> <groupId>org.apache.flink</groupId> >>>>>>> <artifactId>flink-csv</artifactId> >>>>>>> <version>${flink.version}</version></dependency> >>>>>>> >>>>>>> >>>>>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午3:43写道: >>>>>>> >>>>>>>> ------------------------------------------------------------ >>>>>>>> The program finished with the following exception: >>>>>>>> >>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The >>>>>>>> main method caused an error: findAndCreateTableSource failed. >>>>>>>> at >>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) >>>>>>>> at >>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) >>>>>>>> at >>>>>>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) >>>>>>>> at >>>>>>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) >>>>>>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) >>>>>>>> at >>>>>>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) >>>>>>>> at >>>>>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >>>>>>>> at >>>>>>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) >>>>>>>> Caused by: org.apache.flink.table.api.TableException: >>>>>>>> findAndCreateTableSource failed. >>>>>>>> at >>>>>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55) >>>>>>>> at >>>>>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92) >>>>>>>> at >>>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156) >>>>>>>> at >>>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65) >>>>>>>> at >>>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65) >>>>>>>> at >>>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76) >>>>>>>> at >>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) >>>>>>>> at >>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) >>>>>>>> at >>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) >>>>>>>> at >>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) >>>>>>>> at >>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) >>>>>>>> at >>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) >>>>>>>> at >>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) >>>>>>>> at >>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) >>>>>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >>>>>>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) >>>>>>>> at >>>>>>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) >>>>>>>> at >>>>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) >>>>>>>> at >>>>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) >>>>>>>> at >>>>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) >>>>>>>> at >>>>>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) >>>>>>>> at >>>>>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464) >>>>>>>> at Test.main(Test.java:34) >>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>>> at >>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>>>> at >>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>>> at >>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) >>>>>>>> ... 8 more >>>>>>>> Caused by: >>>>>>>> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not >>>>>>>> find >>>>>>>> a suitable table factory for >>>>>>>> 'org.apache.flink.table.factories.TableSourceFactory' in >>>>>>>> the classpath. >>>>>>>> >>>>>>>> Reason: Required context properties mismatch. >>>>>>>> >>>>>>>> The matching candidates: >>>>>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory >>>>>>>> Mismatched properties: >>>>>>>> 'connector.type' expects 'filesystem', but is 'kafka' >>>>>>>> >>>>>>>> The following properties are requested: >>>>>>>> connector.property-version=1 >>>>>>>> connector.topic=test-topic1 >>>>>>>> connector.type=kafka >>>>>>>> connector.version=0.11 >>>>>>>> format.property-version=1 >>>>>>>> format.type=csv >>>>>>>> schema.0.data-type=VARCHAR(2147483647) >>>>>>>> schema.0.name=f0 >>>>>>>> update-mode=append >>>>>>>> >>>>>>>> The following factories have been considered: >>>>>>>> org.apache.flink.table.sources.CsvBatchTableSourceFactory >>>>>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory >>>>>>>> at >>>>>>>> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) >>>>>>>> at >>>>>>>> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) >>>>>>>> at >>>>>>>> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) >>>>>>>> at >>>>>>>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) >>>>>>>> at >>>>>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52) >>>>>>>> ... 34 more >>>>>>>> >>>>>>>> On Sat, Feb 29, 2020 at 11:35 PM Piotr Nowojski < >>>>>>>> pi...@ververica.com> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> > Thanks for the pointer. Looks like the documentation says to >>>>>>>>> use tableEnv.registerTableSink however in my IDE it shows the method >>>>>>>>> is >>>>>>>>> deprecated in Flink 1.10. >>>>>>>>> >>>>>>>>> It looks like not all of the documentation was updated after >>>>>>>>> methods were deprecated. However if you look at the java docs of the >>>>>>>>> `registerTableSink` method, you can find an answer [1]. >>>>>>>>> >>>>>>>>> > It sill doesnt work because it says for CSV the connector.type >>>>>>>>> should be filesystem not Kafka. >>>>>>>>> >>>>>>>>> Can you post the full stack trace? As I’m not familiar with the >>>>>>>>> Table API, maybe you Timo or Dawid know what’s going on here? >>>>>>>>> >>>>>>>>> Piotrek >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/api/TableEnvironment.html#registerTableSink-java.lang.String-org.apache.flink.table.sinks.TableSink- >>>>>>>>> >>>>>>>>> On 1 Mar 2020, at 07:50, kant kodali <kanth...@gmail.com> wrote: >>>>>>>>> >>>>>>>>> Here is my updated code after digging through the source code (not >>>>>>>>> sure if it is correct ). It sill doesnt work because it says for CSV >>>>>>>>> the >>>>>>>>> connector.type should be filesystem not Kafka but documentation says >>>>>>>>> it is >>>>>>>>> supported. >>>>>>>>> >>>>>>>>> >>>>>>>>> import >>>>>>>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend;import >>>>>>>>> org.apache.flink.runtime.state.StateBackend;import >>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import >>>>>>>>> org.apache.flink.table.api.DataTypes;import >>>>>>>>> org.apache.flink.table.api.EnvironmentSettings;import >>>>>>>>> org.apache.flink.table.api.Table;import >>>>>>>>> org.apache.flink.table.api.java.StreamTableEnvironment;import >>>>>>>>> org.apache.flink.table.descriptors.Csv;import >>>>>>>>> org.apache.flink.table.descriptors.Kafka;import >>>>>>>>> org.apache.flink.table.descriptors.Schema;public class Test { >>>>>>>>> >>>>>>>>> public static void main(String... args) throws Exception { >>>>>>>>> EnvironmentSettings settings = >>>>>>>>> EnvironmentSettings.newInstance() >>>>>>>>> .useBlinkPlanner() >>>>>>>>> .inStreamingMode() >>>>>>>>> .build(); StreamExecutionEnvironment >>>>>>>>> streamExecutionEnvironment = >>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>> streamExecutionEnvironment.setStateBackend((StateBackend) new >>>>>>>>> RocksDBStateBackend("file:///tmp/rocksdb")); >>>>>>>>> StreamTableEnvironment tableEnvironment = >>>>>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings); >>>>>>>>> tableEnvironment >>>>>>>>> .connect( >>>>>>>>> new Kafka() >>>>>>>>> .version("0.11") >>>>>>>>> .topic("test-topic1") >>>>>>>>> ) >>>>>>>>> .withFormat(new Csv()) >>>>>>>>> .withSchema(new Schema().field("f0", DataTypes.STRING())) >>>>>>>>> .inAppendMode() >>>>>>>>> .createTemporaryTable("kafka_source"); Table >>>>>>>>> resultTable = tableEnvironment.sqlQuery("select * from >>>>>>>>> kafka_source"); tableEnvironment >>>>>>>>> .connect( >>>>>>>>> new Kafka() >>>>>>>>> .version("0.11") >>>>>>>>> .topic("test-topic2") >>>>>>>>> ) >>>>>>>>> .withFormat(new Csv()) >>>>>>>>> .withSchema(new Schema().field("f0", DataTypes.STRING())) >>>>>>>>> .inAppendMode() >>>>>>>>> .createTemporaryTable("kafka_target"); >>>>>>>>> tableEnvironment.insertInto("kafka_target", resultTable); >>>>>>>>> tableEnvironment.execute("Sample Job"); } >>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>>> On Sat, Feb 29, 2020 at 7:48 PM kant kodali <kanth...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Benchao, >>>>>>>>>> >>>>>>>>>> Agreed a ConsoleSink is very useful but that is not the only >>>>>>>>>> problem here. Documentation says use tableEnv.registerTableSink all >>>>>>>>>> over >>>>>>>>>> the place >>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink >>>>>>>>>> however that function is deprecated. so how do I add any other Sink? >>>>>>>>>> >>>>>>>>>> Thanks! >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sat, Feb 29, 2020 at 6:05 PM Benchao Li <libenc...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi kant, >>>>>>>>>>> >>>>>>>>>>> AFAIK, there is no "print to stdout" sink for Table API now, you >>>>>>>>>>> can implement one custom sink following this doc[1]. >>>>>>>>>>> >>>>>>>>>>> IMO, an out-of-box print table sink is very useful, and I've >>>>>>>>>>> created an issue[2] to track this. >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink >>>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-16354 >>>>>>>>>>> >>>>>>>>>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 上午2:30写道: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for the pointer. Looks like the documentation says to >>>>>>>>>>>> use tableEnv.registerTableSink however in my IDE it shows the >>>>>>>>>>>> method is >>>>>>>>>>>> deprecated in Flink 1.10. so I am still not seeing a way to add a >>>>>>>>>>>> sink that >>>>>>>>>>>> can print to stdout? what sink should I use to print to stdout and >>>>>>>>>>>> how do I >>>>>>>>>>>> add it without converting into DataStream? >>>>>>>>>>>> >>>>>>>>>>>> Thanks! >>>>>>>>>>>> >>>>>>>>>>>> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski < >>>>>>>>>>>> pi...@ververica.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> >>>>>>>>>>>>> You shouldn’t be using `KafkaTableSource` as it’s marked >>>>>>>>>>>>> @Internal. It’s not part of any public API. >>>>>>>>>>>>> >>>>>>>>>>>>> You don’t have to convert DataStream into Table to read from >>>>>>>>>>>>> Kafka in Table API. I guess you could, if you had used DataStream >>>>>>>>>>>>> API’s >>>>>>>>>>>>> FlinkKafkaConsumer as it’s documented here [1]. >>>>>>>>>>>>> >>>>>>>>>>>>> But you should be able to use Kafka Table connector directly, >>>>>>>>>>>>> as it is described in the docs [2][3]. >>>>>>>>>>>>> >>>>>>>>>>>>> Piotrek >>>>>>>>>>>>> >>>>>>>>>>>>> [1] >>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html >>>>>>>>>>>>> [2] >>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview >>>>>>>>>>>>> [3] >>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector >>>>>>>>>>>>> >>>>>>>>>>>>> On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Also why do I need to convert to DataStream to print the rows >>>>>>>>>>>>> of a table? Why not have a print method in the Table itself? >>>>>>>>>>>>> >>>>>>>>>>>>> On Sat, Feb 29, 2020 at 3:40 AM kant kodali < >>>>>>>>>>>>> kanth...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi All, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Do I need to use DataStream API or Table API to construct >>>>>>>>>>>>>> sources? I am just trying to read from Kafka and print it to >>>>>>>>>>>>>> console. And >>>>>>>>>>>>>> yes I tried it with datastreams and it works fine but I want to >>>>>>>>>>>>>> do it using >>>>>>>>>>>>>> Table related APIs. I don't see any documentation or a sample on >>>>>>>>>>>>>> how to >>>>>>>>>>>>>> create Kafka table source or any other source using Table Source >>>>>>>>>>>>>> API's so >>>>>>>>>>>>>> after some digging I wrote the following code. My ultimate goal >>>>>>>>>>>>>> is to avoid >>>>>>>>>>>>>> Datastream API as much as possible and just use Table API & SQL >>>>>>>>>>>>>> but somehow >>>>>>>>>>>>>> I feel the Flink framework focuses on DataStream than the SQL >>>>>>>>>>>>>> interface. am >>>>>>>>>>>>>> I wrong? From the user perspective wouldn't it make more sense >>>>>>>>>>>>>> to focus on >>>>>>>>>>>>>> SQL interfaces for both streaming and batch? >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> import >>>>>>>>>>>>>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;import >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSource;import >>>>>>>>>>>>>> org.apache.flink.table.api.DataTypes;import >>>>>>>>>>>>>> org.apache.flink.table.api.EnvironmentSettings;import >>>>>>>>>>>>>> org.apache.flink.table.api.Table;import >>>>>>>>>>>>>> org.apache.flink.table.api.TableSchema;import >>>>>>>>>>>>>> org.apache.flink.table.api.java.StreamTableEnvironment;import >>>>>>>>>>>>>> org.apache.flink.table.sources.TableSource;import >>>>>>>>>>>>>> org.apache.flink.types.Row;import java.io.IOException;import >>>>>>>>>>>>>> java.util.Properties;public class Test { >>>>>>>>>>>>>> >>>>>>>>>>>>>> public class MyDeserializationSchema extends >>>>>>>>>>>>>> AbstractDeserializationSchema<Row> { >>>>>>>>>>>>>> @Override public Row deserialize(byte[] message) >>>>>>>>>>>>>> throws IOException { >>>>>>>>>>>>>> return Row.of(new String(message)); } >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> public static void main(String... args) throws Exception { >>>>>>>>>>>>>> Test test = new Test(); EnvironmentSettings >>>>>>>>>>>>>> settings = EnvironmentSettings.newInstance() >>>>>>>>>>>>>> .useBlinkPlanner() >>>>>>>>>>>>>> .inStreamingMode() >>>>>>>>>>>>>> .build(); StreamExecutionEnvironment >>>>>>>>>>>>>> streamExecutionEnvironment = >>>>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>>>>>> StreamTableEnvironment tableEnvironment = >>>>>>>>>>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, >>>>>>>>>>>>>> settings); TableSource tableSource = >>>>>>>>>>>>>> test.getKafkaTableSource(); Table kafkaTable = >>>>>>>>>>>>>> tableEnvironment.fromTableSource(tableSource); >>>>>>>>>>>>>> tableEnvironment.createTemporaryView("kafka_source", >>>>>>>>>>>>>> kafkaTable); Table resultTable = >>>>>>>>>>>>>> tableEnvironment.sqlQuery("select * from kafka_source"); >>>>>>>>>>>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print(); >>>>>>>>>>>>>> streamExecutionEnvironment.execute("Sample Job"); } >>>>>>>>>>>>>> >>>>>>>>>>>>>> public KafkaTableSource getKafkaTableSource() { >>>>>>>>>>>>>> TableSchema tableSchema = >>>>>>>>>>>>>> TableSchema.builder().field("f0", DataTypes.STRING()).build(); >>>>>>>>>>>>>> Properties properties = new Properties(); >>>>>>>>>>>>>> properties.setProperty("bootstrap.servers", "localhost:9092"); >>>>>>>>>>>>>> properties.setProperty("group.id", "test"); return >>>>>>>>>>>>>> new KafkaTableSource(tableSchema, "test-topic1", properties, new >>>>>>>>>>>>>> MyDeserializationSchema()); } >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> I get the following error >>>>>>>>>>>>>> >>>>>>>>>>>>>> The program finished with the following exception: >>>>>>>>>>>>>> >>>>>>>>>>>>>> The implementation of the FlinkKafkaConsumerBase is not >>>>>>>>>>>>>> serializable. The object probably contains or references non >>>>>>>>>>>>>> serializable >>>>>>>>>>>>>> fields. >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) >>>>>>>>>>>>>> >>>>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>>>>>>>>>>>> >>>>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>>>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:891) >>>>>>>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>>>>>>>>>>>>> >>>>>>>>>>>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>>>>>>>>>>>>> scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>>>>>>>>>>>>> >>>>>>>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >>>>>>>>>>>>>> >>>>>>>>>>>>>> scala.collection.AbstractTraversable.map(Traversable.scala:104) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250) >>>>>>>>>>>>>> Test.main(Test.java:40) >>>>>>>>>>>>>> >>>>>>>>>>>>>> The error seems to be on the line >>>>>>>>>>>>>> >>>>>>>>>>>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print(); >>>>>>>>>>>>>> >>>>>>>>>>>>>> and I am not sure why it is not able to serialize? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks! >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> >>>>>>>>>>> Benchao Li >>>>>>>>>>> School of Electronics Engineering and Computer Science, Peking >>>>>>>>>>> UniversityTel:+86-15650713730 >>>>>>>>>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Benchao Li >>>>>>> School of Electronics Engineering and Computer Science, Peking >>>>>>> UniversityTel:+86-15650713730 >>>>>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn >>>>>>> >>>>>>> >>> >>> -- >>> >>> Benchao Li >>> School of Electronics Engineering and Computer Science, Peking >>> UniversityTel:+86-15650713730 >>> Email: libenc...@gmail.com; libenc...@pku.edu.cn >>> >>>