Hi Kant,

I think Dawid meant to not add the Kafka version number like this:

flinkShadowJar "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}"


On Sun, Mar 1, 2020 at 7:31 PM kant kodali <kanth...@gmail.com> wrote:

> * What went wrong:
> Could not determine the dependencies of task ':shadowJar'.
> > Could not resolve all dependencies for configuration ':flinkShadowJar'.
>    > Could not find
> org.apache.flink:flink-sql-connector-kafka_2.11:universal.
>      Searched in the following locations:
>        -
> https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.pom
>        -
> https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.jar
>        -
> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.pom
>        -
> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.jar
>      Required by:
>          project :
>
>
>
> On Sun, Mar 1, 2020 at 6:43 AM Dawid Wysakowicz <dwysakow...@apache.org>
> wrote:
>
>> Hi Kant,
>>
>> If you want to use the *universal *kafka connector you use "universal"
>> for the version. The community decided to no longer distinguish different
>> kafka connector versions, but to use the newest kafka client version for
>> all versions of kafka 1.0+. So if you want to use the connector from
>> flink-sql-connector-kafka_2.11 use "universal" for the version.
>>
>> As for the collect/print sink. We do realize importance of the sink and
>> there were a few approaches to implement one. Including the TableUtils
>> mentioned by godfrey. It does not have strong consistency guarantees and is
>> recommended rather only for experiments/testing. There is also an ongoing
>> discussion how to implement such a sink for *both *batch and streaming
>> here:
>> https://issues.apache.org/jira/browse/FLINK-14807?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=17046455#comment-17046455
>>
>> Best,
>>
>> Dawid
>> On 01/03/2020 12:00, kant kodali wrote:
>>
>> Hi Benchao,
>>
>> That worked! Pasting the build.gradle file here. However this only works
>> for 0.11 and it needs zookeeper.connect() which shouldn't be required. not
>> sure why it is required in Flink Kafka connector?  If I change the version
>> to 2.2 in the code and specify this jar
>>
>> flinkShadowJar 
>> "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}"
>>
>> or
>>
>> flinkShadowJar 
>> "org.apache.flink:flink-sql-connector-kafka-0.11_2.11:${flinkVersion}" //Not 
>> sure if I should use this one for Kafka >= 0.11
>>
>> It doesn't work either.
>>
>>
>> buildscript {    repositories {        jcenter() // this applies only to the 
>> Gradle 'Shadow' plugin    }    dependencies {        classpath 
>> 'com.github.jengelman.gradle.plugins:shadow:2.0.4'    }}plugins {    id 
>> 'java'    id 'application'}mainClassName = 'Test'apply plugin: 
>> 'com.github.johnrengelman.shadow'ext {    javaVersion = '1.8'    
>> flinkVersion = '1.10.0'    scalaBinaryVersion = '2.11'    slf4jVersion = 
>> '1.7.7'    log4jVersion = '1.2.17'}sourceCompatibility = 
>> javaVersiontargetCompatibility = javaVersiontasks.withType(JavaCompile) {    
>> options.encoding = 'UTF-8'}applicationDefaultJvmArgs = 
>> ["-Dlog4j.configuration=log4j.properties"]
>> // declare where to find the dependencies of your projectrepositories {    
>> mavenCentral()
>>     maven { url 
>> "https://repository.apache.org/content/repositories/snapshots/"; }}// NOTE: 
>> We cannot use "compileOnly" or "shadow" configurations since then we could 
>> not run code// in the IDE or with "gradle run". We also cannot exclude 
>> transitive dependencies from the// shadowJar yet (see 
>> https://github.com/johnrengelman/shadow/issues/159).// -> Explicitly define 
>> the // libraries we want to be included in the "flinkShadowJar" 
>> configuration!configurations {    flinkShadowJar // dependencies which go 
>> into the shadowJar    // always exclude these (also from transitive 
>> dependencies) since they are provided by Flink    flinkShadowJar.exclude 
>> group: 'org.apache.flink', module: 'force-shading'    flinkShadowJar.exclude 
>> group: 'com.google.code.findbugs', module: 'jsr305'    
>> flinkShadowJar.exclude group: 'org.slf4j'    flinkShadowJar.exclude group: 
>> 'log4j'}// declare the dependencies for your production and test 
>> codedependencies {    // 
>> --------------------------------------------------------------    // 
>> Compile-time dependencies that should NOT be part of the    // shadow jar 
>> and are provided in the lib folder of Flink    // 
>> --------------------------------------------------------------    compile 
>> "org.apache.flink:flink-java:${flinkVersion}"    compile 
>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>>     // --------------------------------------------------------------    // 
>> Dependencies that should be part of the shadow jar, e.g.    // connectors. 
>> These must be in the flinkShadowJar configuration!    // 
>> --------------------------------------------------------------    compile 
>> "org.apache.flink:flink-java:${flinkVersion}"    compile 
>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>>     flinkShadowJar "org.apache.flink:flink-csv:${flinkVersion}"    // tried 
>> doesnt work. same problem    //flinkShadowJar 
>> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"    // tried 
>> doesnt work. same problem    //flinkShadowJar 
>> "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}"    //tried 
>> doesnt work. same problem    flinkShadowJar 
>> "org.apache.flink:flink-sql-connector-kafka-0.11_2.11:${flinkVersion}"    
>> flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}"    
>> compileOnly 
>> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"    
>> compileOnly 
>> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"    
>> flinkShadowJar "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}" 
>>    flinkShadowJar 
>> "org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}"    
>> compile "log4j:log4j:${log4jVersion}"    compile 
>> "org.slf4j:slf4j-log4j12:${slf4jVersion}"    // Add test dependencies here.  
>>   // testCompile "junit:junit:4.12"    testImplementation 
>> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"    
>> testImplementation 
>> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"}// make 
>> compileOnly dependencies available for tests:sourceSets {    
>> main.compileClasspath += configurations.flinkShadowJar
>>     main.runtimeClasspath += configurations.flinkShadowJar
>>
>>     test.compileClasspath += configurations.flinkShadowJar
>>     test.runtimeClasspath += configurations.flinkShadowJar
>>
>>     javadoc.classpath += configurations.flinkShadowJar}run.classpath = 
>> sourceSets.main.runtimeClasspathjar {    manifest {        attributes 
>> 'Built-By': System.getProperty('user.name'),
>>                 'Build-Jdk': System.getProperty('java.version')
>>     }}shadowJar {    configurations = [project.configurations.flinkShadowJar]
>>     mergeServiceFiles()
>>     manifest {        attributes 'Main-Class': mainClassName    }}
>>
>>
>>
>>
>>
>>
>> On Sun, Mar 1, 2020 at 1:38 AM Benchao Li <libenc...@gmail.com> wrote:
>>
>>> I don't know how gradle works, but in Maven, packaging dependencies into
>>> one fat jar needs to specify how SPI property files should be dealt with,
>>> like
>>>
>>> <transformers>    <transformer 
>>> implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers>
>>>
>>> Could you check that your final jar contains correct resource file?
>>>
>>> godfrey he <godfre...@gmail.com> 于2020年3月1日周日 下午5:25写道:
>>>
>>>> I think you should use `flink-sql-connector-kafka*-0.11*_2.11` instead
>>>> of `flink-connector-kafka_2.11`.
>>>>
>>>> Bests,
>>>> Godfrey
>>>>
>>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午5:15写道:
>>>>
>>>>> The dependency was already there. Below is my build.gradle. Also I
>>>>> checked the kafka version and looks like the jar
>>>>>
>>>>> flinkShadowJar 
>>>>> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"
>>>>>
>>>>> downloads kafka-clients version 2.2.0. So I changed my code to version
>>>>> 2.2.0 and same problem persists.
>>>>>
>>>>> buildscript {    repositories {        jcenter() // this applies only to 
>>>>> the Gradle 'Shadow' plugin    }    dependencies {        classpath 
>>>>> 'com.github.jengelman.gradle.plugins:shadow:2.0.4'    }}plugins {    id 
>>>>> 'java'    id 'application'}mainClassName = 'Test'apply plugin: 
>>>>> 'com.github.johnrengelman.shadow'ext {    javaVersion = '1.8'    
>>>>> flinkVersion = '1.10.0'    scalaBinaryVersion = '2.11'    slf4jVersion = 
>>>>> '1.7.7'    log4jVersion = '1.2.17'}sourceCompatibility = 
>>>>> javaVersiontargetCompatibility = javaVersiontasks.withType(JavaCompile) { 
>>>>>    options.encoding = 'UTF-8'}applicationDefaultJvmArgs = 
>>>>> ["-Dlog4j.configuration=log4j.properties"]
>>>>> // declare where to find the dependencies of your projectrepositories {   
>>>>>  mavenCentral()
>>>>>     maven { url 
>>>>> "https://repository.apache.org/content/repositories/snapshots/"; }}// 
>>>>> NOTE: We cannot use "compileOnly" or "shadow" configurations since then 
>>>>> we could not run code// in the IDE or with "gradle run". We also cannot 
>>>>> exclude transitive dependencies from the// shadowJar yet (see 
>>>>> https://github.com/johnrengelman/shadow/issues/159).// -> Explicitly 
>>>>> define the // libraries we want to be included in the "flinkShadowJar" 
>>>>> configuration!configurations {    flinkShadowJar // dependencies which go 
>>>>> into the shadowJar    // always exclude these (also from transitive 
>>>>> dependencies) since they are provided by Flink    flinkShadowJar.exclude 
>>>>> group: 'org.apache.flink', module: 'force-shading'    
>>>>> flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 
>>>>> 'jsr305'    flinkShadowJar.exclude group: 'org.slf4j'    
>>>>> flinkShadowJar.exclude group: 'log4j'}// declare the dependencies for 
>>>>> your production and test codedependencies {    // 
>>>>> --------------------------------------------------------------    // 
>>>>> Compile-time dependencies that should NOT be part of the    // shadow jar 
>>>>> and are provided in the lib folder of Flink    // 
>>>>> --------------------------------------------------------------    compile 
>>>>> "org.apache.flink:flink-java:${flinkVersion}"    compile 
>>>>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>>>>>     // --------------------------------------------------------------    
>>>>> // Dependencies that should be part of the shadow jar, e.g.    // 
>>>>> connectors. These must be in the flinkShadowJar configuration!    // 
>>>>> --------------------------------------------------------------    compile 
>>>>> "org.apache.flink:flink-java:${flinkVersion}"    compile 
>>>>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>>>>>     flinkShadowJar "org.apache.flink:flink-csv:${flinkVersion}"    
>>>>> flinkShadowJar 
>>>>> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"    
>>>>> flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}"    
>>>>> compileOnly 
>>>>> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"    
>>>>> compileOnly 
>>>>> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"    
>>>>> flinkShadowJar 
>>>>> "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}"    
>>>>> flinkShadowJar 
>>>>> "org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}"    
>>>>> compile "log4j:log4j:${log4jVersion}"    compile 
>>>>> "org.slf4j:slf4j-log4j12:${slf4jVersion}"    // Add test dependencies 
>>>>> here.    // testCompile "junit:junit:4.12"    testImplementation 
>>>>> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"    
>>>>> testImplementation 
>>>>> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"}// make 
>>>>> compileOnly dependencies available for tests:sourceSets {    
>>>>> main.compileClasspath += configurations.flinkShadowJar
>>>>>     main.runtimeClasspath += configurations.flinkShadowJar
>>>>>
>>>>>     test.compileClasspath += configurations.flinkShadowJar
>>>>>     test.runtimeClasspath += configurations.flinkShadowJar
>>>>>
>>>>>     javadoc.classpath += configurations.flinkShadowJar}run.classpath = 
>>>>> sourceSets.main.runtimeClasspathjar {    manifest {        attributes 
>>>>> 'Built-By': System.getProperty('user.name'),
>>>>>                 'Build-Jdk': System.getProperty('java.version')
>>>>>     }}shadowJar {    configurations = 
>>>>> [project.configurations.flinkShadowJar]}
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sun, Mar 1, 2020 at 12:50 AM godfrey he <godfre...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> hi kant,
>>>>>>
>>>>>> > Also why do I need to convert to DataStream to print the rows of a
>>>>>> table? Why not have a print method in the Table itself?
>>>>>> Flink 1.10 introduces a utility class named TableUtils to convert a
>>>>>> Table to List<Row>, this utility class is mainly used for demonstration 
>>>>>> or
>>>>>> testing and is only applicable for *small batch jobs* and small
>>>>>> finite *append only stream jobs*.  code like:
>>>>>> Table table = tEnv.sqlQuery("select ...");
>>>>>> List<Row> result = TableUtils.collectToList(table);
>>>>>> result.....
>>>>>>
>>>>>> currently, we are planner to implement Table#collect[1], after
>>>>>> that Table#head and Table#print may be also introduced soon.
>>>>>>
>>>>>> >  The program finished with the following exception:
>>>>>> please make sure that the kafka version in Test class and the kafka
>>>>>> version in pom dependency are same. I tested your code successfully.
>>>>>>
>>>>>> Bests,
>>>>>> Godfrey
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-14807
>>>>>>
>>>>>>
>>>>>> Benchao Li <libenc...@gmail.com> 于2020年3月1日周日 下午4:44写道:
>>>>>>
>>>>>>> Hi kant,
>>>>>>>
>>>>>>> CSV format is an independent module, you need to add it as your
>>>>>>> dependency.
>>>>>>>
>>>>>>> <dependency>   <groupId>org.apache.flink</groupId>   
>>>>>>> <artifactId>flink-csv</artifactId>   
>>>>>>> <version>${flink.version}</version></dependency>
>>>>>>>
>>>>>>>
>>>>>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午3:43写道:
>>>>>>>
>>>>>>>> ------------------------------------------------------------
>>>>>>>>  The program finished with the following exception:
>>>>>>>>
>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The
>>>>>>>> main method caused an error: findAndCreateTableSource failed.
>>>>>>>> at
>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>>>>>>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>>>>>>>> Caused by: org.apache.flink.table.api.TableException:
>>>>>>>> findAndCreateTableSource failed.
>>>>>>>> at
>>>>>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)
>>>>>>>> at
>>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>>>>>>>> at
>>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>>>>>>>> at
>>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>>>>>>>> at
>>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>>>>>>>> at
>>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>>>>>>>> at
>>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>>>>>>>> at
>>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>>>>>>>> at
>>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>>>>>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>>>>>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
>>>>>>>> at Test.main(Test.java:34)
>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>> at
>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>> at
>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>>>>>>>> ... 8 more
>>>>>>>> Caused by:
>>>>>>>> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not 
>>>>>>>> find
>>>>>>>> a suitable table factory for
>>>>>>>> 'org.apache.flink.table.factories.TableSourceFactory' in
>>>>>>>> the classpath.
>>>>>>>>
>>>>>>>> Reason: Required context properties mismatch.
>>>>>>>>
>>>>>>>> The matching candidates:
>>>>>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>>>>>>> Mismatched properties:
>>>>>>>> 'connector.type' expects 'filesystem', but is 'kafka'
>>>>>>>>
>>>>>>>> The following properties are requested:
>>>>>>>> connector.property-version=1
>>>>>>>> connector.topic=test-topic1
>>>>>>>> connector.type=kafka
>>>>>>>> connector.version=0.11
>>>>>>>> format.property-version=1
>>>>>>>> format.type=csv
>>>>>>>> schema.0.data-type=VARCHAR(2147483647)
>>>>>>>> schema.0.name=f0
>>>>>>>> update-mode=append
>>>>>>>>
>>>>>>>> The following factories have been considered:
>>>>>>>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>>>>>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>>>>>>> at
>>>>>>>> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
>>>>>>>> ... 34 more
>>>>>>>>
>>>>>>>> On Sat, Feb 29, 2020 at 11:35 PM Piotr Nowojski <
>>>>>>>> pi...@ververica.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> > Thanks for the pointer. Looks like the documentation says to
>>>>>>>>> use tableEnv.registerTableSink however in my IDE it shows the method 
>>>>>>>>> is
>>>>>>>>> deprecated in Flink 1.10.
>>>>>>>>>
>>>>>>>>> It looks like not all of the documentation was updated after
>>>>>>>>> methods were deprecated. However if you look at the java docs of the
>>>>>>>>> `registerTableSink` method, you can find an answer [1].
>>>>>>>>>
>>>>>>>>> >  It sill doesnt work because it says for CSV the connector.type
>>>>>>>>> should be filesystem not Kafka.
>>>>>>>>>
>>>>>>>>> Can you post the full stack trace? As I’m not familiar with the
>>>>>>>>> Table API, maybe you Timo or Dawid know what’s going on here?
>>>>>>>>>
>>>>>>>>> Piotrek
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/api/TableEnvironment.html#registerTableSink-java.lang.String-org.apache.flink.table.sinks.TableSink-
>>>>>>>>>
>>>>>>>>> On 1 Mar 2020, at 07:50, kant kodali <kanth...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> Here is my updated code after digging through the source code (not
>>>>>>>>> sure if it is correct ). It sill doesnt work because it says for CSV 
>>>>>>>>> the
>>>>>>>>> connector.type should be filesystem not Kafka but documentation says 
>>>>>>>>> it is
>>>>>>>>> supported.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> import 
>>>>>>>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend;import 
>>>>>>>>> org.apache.flink.runtime.state.StateBackend;import 
>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
>>>>>>>>>  org.apache.flink.table.api.DataTypes;import 
>>>>>>>>> org.apache.flink.table.api.EnvironmentSettings;import 
>>>>>>>>> org.apache.flink.table.api.Table;import 
>>>>>>>>> org.apache.flink.table.api.java.StreamTableEnvironment;import 
>>>>>>>>> org.apache.flink.table.descriptors.Csv;import 
>>>>>>>>> org.apache.flink.table.descriptors.Kafka;import 
>>>>>>>>> org.apache.flink.table.descriptors.Schema;public class Test {
>>>>>>>>>
>>>>>>>>>     public static void main(String... args) throws Exception {
>>>>>>>>>         EnvironmentSettings settings = 
>>>>>>>>> EnvironmentSettings.newInstance()
>>>>>>>>>                 .useBlinkPlanner()
>>>>>>>>>                 .inStreamingMode()
>>>>>>>>>                 .build();        StreamExecutionEnvironment 
>>>>>>>>> streamExecutionEnvironment = 
>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();        
>>>>>>>>> streamExecutionEnvironment.setStateBackend((StateBackend) new 
>>>>>>>>> RocksDBStateBackend("file:///tmp/rocksdb"));        
>>>>>>>>> StreamTableEnvironment tableEnvironment = 
>>>>>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings);  
>>>>>>>>>       tableEnvironment
>>>>>>>>>             .connect(
>>>>>>>>>                 new Kafka()
>>>>>>>>>                     .version("0.11")
>>>>>>>>>                     .topic("test-topic1")
>>>>>>>>>             )
>>>>>>>>>             .withFormat(new Csv())
>>>>>>>>>             .withSchema(new Schema().field("f0", DataTypes.STRING()))
>>>>>>>>>             .inAppendMode()
>>>>>>>>>             .createTemporaryTable("kafka_source");        Table 
>>>>>>>>> resultTable = tableEnvironment.sqlQuery("select * from 
>>>>>>>>> kafka_source");        tableEnvironment
>>>>>>>>>             .connect(
>>>>>>>>>                 new Kafka()
>>>>>>>>>                     .version("0.11")
>>>>>>>>>                     .topic("test-topic2")
>>>>>>>>>             )
>>>>>>>>>             .withFormat(new Csv())
>>>>>>>>>             .withSchema(new Schema().field("f0", DataTypes.STRING()))
>>>>>>>>>             .inAppendMode()
>>>>>>>>>             .createTemporaryTable("kafka_target");        
>>>>>>>>> tableEnvironment.insertInto("kafka_target", resultTable);        
>>>>>>>>> tableEnvironment.execute("Sample Job");    }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sat, Feb 29, 2020 at 7:48 PM kant kodali <kanth...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Benchao,
>>>>>>>>>>
>>>>>>>>>> Agreed a ConsoleSink is very useful but that is not the only
>>>>>>>>>> problem here. Documentation says use  tableEnv.registerTableSink all 
>>>>>>>>>> over
>>>>>>>>>> the place
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink
>>>>>>>>>> however that function is deprecated. so how do I add any other Sink?
>>>>>>>>>>
>>>>>>>>>> Thanks!
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sat, Feb 29, 2020 at 6:05 PM Benchao Li <libenc...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi kant,
>>>>>>>>>>>
>>>>>>>>>>> AFAIK, there is no "print to stdout" sink for Table API now, you
>>>>>>>>>>> can implement one custom sink following this doc[1].
>>>>>>>>>>>
>>>>>>>>>>> IMO, an out-of-box print table sink is very useful, and I've
>>>>>>>>>>> created an issue[2] to track this.
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink
>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-16354
>>>>>>>>>>>
>>>>>>>>>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 上午2:30写道:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for the pointer. Looks like the documentation says to
>>>>>>>>>>>> use tableEnv.registerTableSink however in my IDE it shows the 
>>>>>>>>>>>> method is
>>>>>>>>>>>> deprecated in Flink 1.10. so I am still not seeing a way to add a 
>>>>>>>>>>>> sink that
>>>>>>>>>>>> can print to stdout? what sink should I use to print to stdout and 
>>>>>>>>>>>> how do I
>>>>>>>>>>>> add it without converting into DataStream?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski <
>>>>>>>>>>>> pi...@ververica.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> You shouldn’t be using `KafkaTableSource` as it’s marked
>>>>>>>>>>>>> @Internal. It’s not part of any public API.
>>>>>>>>>>>>>
>>>>>>>>>>>>> You don’t have to convert DataStream into Table to read from
>>>>>>>>>>>>> Kafka in Table API. I guess you could, if you had used DataStream 
>>>>>>>>>>>>> API’s
>>>>>>>>>>>>> FlinkKafkaConsumer as it’s documented here [1].
>>>>>>>>>>>>>
>>>>>>>>>>>>> But you should be able to use Kafka Table connector directly,
>>>>>>>>>>>>> as it is described in the docs [2][3].
>>>>>>>>>>>>>
>>>>>>>>>>>>> Piotrek
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]
>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
>>>>>>>>>>>>> [2]
>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview
>>>>>>>>>>>>> [3]
>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Also why do I need to convert to DataStream to print the rows
>>>>>>>>>>>>> of a table? Why not have a print method in the Table itself?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sat, Feb 29, 2020 at 3:40 AM kant kodali <
>>>>>>>>>>>>> kanth...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Do I need to use DataStream API or Table API to construct
>>>>>>>>>>>>>> sources? I am just trying to read from Kafka and print it to 
>>>>>>>>>>>>>> console. And
>>>>>>>>>>>>>> yes I tried it with datastreams and it works fine but I want to 
>>>>>>>>>>>>>> do it using
>>>>>>>>>>>>>> Table related APIs. I don't see any documentation or a sample on 
>>>>>>>>>>>>>> how to
>>>>>>>>>>>>>> create Kafka table source or any other source using Table Source 
>>>>>>>>>>>>>> API's so
>>>>>>>>>>>>>> after some digging I wrote the following code. My ultimate goal 
>>>>>>>>>>>>>> is to avoid
>>>>>>>>>>>>>> Datastream API as much as possible and just use Table API & SQL 
>>>>>>>>>>>>>> but somehow
>>>>>>>>>>>>>> I feel the Flink framework focuses on DataStream than the SQL 
>>>>>>>>>>>>>> interface. am
>>>>>>>>>>>>>> I wrong? From the user perspective wouldn't it make more sense 
>>>>>>>>>>>>>> to focus on
>>>>>>>>>>>>>> SQL interfaces for both streaming and batch?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> import 
>>>>>>>>>>>>>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;import
>>>>>>>>>>>>>>  
>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
>>>>>>>>>>>>>>  
>>>>>>>>>>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSource;import
>>>>>>>>>>>>>>  org.apache.flink.table.api.DataTypes;import 
>>>>>>>>>>>>>> org.apache.flink.table.api.EnvironmentSettings;import 
>>>>>>>>>>>>>> org.apache.flink.table.api.Table;import 
>>>>>>>>>>>>>> org.apache.flink.table.api.TableSchema;import 
>>>>>>>>>>>>>> org.apache.flink.table.api.java.StreamTableEnvironment;import 
>>>>>>>>>>>>>> org.apache.flink.table.sources.TableSource;import 
>>>>>>>>>>>>>> org.apache.flink.types.Row;import java.io.IOException;import 
>>>>>>>>>>>>>> java.util.Properties;public class Test {
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     public class MyDeserializationSchema extends 
>>>>>>>>>>>>>> AbstractDeserializationSchema<Row> {
>>>>>>>>>>>>>>         @Override        public Row deserialize(byte[] message) 
>>>>>>>>>>>>>> throws IOException {
>>>>>>>>>>>>>>             return Row.of(new String(message));        }
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     public static void main(String... args) throws Exception {
>>>>>>>>>>>>>>         Test test = new Test();        EnvironmentSettings 
>>>>>>>>>>>>>> settings = EnvironmentSettings.newInstance()
>>>>>>>>>>>>>>                 .useBlinkPlanner()
>>>>>>>>>>>>>>                 .inStreamingMode()
>>>>>>>>>>>>>>                 .build();        StreamExecutionEnvironment 
>>>>>>>>>>>>>> streamExecutionEnvironment = 
>>>>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();        
>>>>>>>>>>>>>> StreamTableEnvironment tableEnvironment = 
>>>>>>>>>>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, 
>>>>>>>>>>>>>> settings);        TableSource tableSource = 
>>>>>>>>>>>>>> test.getKafkaTableSource();        Table kafkaTable = 
>>>>>>>>>>>>>> tableEnvironment.fromTableSource(tableSource);        
>>>>>>>>>>>>>> tableEnvironment.createTemporaryView("kafka_source", 
>>>>>>>>>>>>>> kafkaTable);        Table resultTable = 
>>>>>>>>>>>>>> tableEnvironment.sqlQuery("select * from kafka_source");        
>>>>>>>>>>>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print(); 
>>>>>>>>>>>>>>        streamExecutionEnvironment.execute("Sample Job");    }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     public KafkaTableSource getKafkaTableSource() {
>>>>>>>>>>>>>>         TableSchema tableSchema = 
>>>>>>>>>>>>>> TableSchema.builder().field("f0", DataTypes.STRING()).build();   
>>>>>>>>>>>>>>      Properties properties = new Properties();        
>>>>>>>>>>>>>> properties.setProperty("bootstrap.servers", "localhost:9092");   
>>>>>>>>>>>>>>      properties.setProperty("group.id", "test");        return 
>>>>>>>>>>>>>> new KafkaTableSource(tableSchema, "test-topic1", properties, new 
>>>>>>>>>>>>>> MyDeserializationSchema());    }
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I get the following error
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The program finished with the following exception:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The implementation of the FlinkKafkaConsumerBase is not
>>>>>>>>>>>>>> serializable. The object probably contains or references non 
>>>>>>>>>>>>>> serializable
>>>>>>>>>>>>>> fields.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>>>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>>>>>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>>>>>>>>>>>> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
>>>>>>>>>>>>>> Test.main(Test.java:40)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The error seems to be on the line
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print();
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> and I am not sure why it is not able to serialize?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>>
>>>>>>>>>>> Benchao Li
>>>>>>>>>>> School of Electronics Engineering and Computer Science, Peking 
>>>>>>>>>>> UniversityTel:+86-15650713730
>>>>>>>>>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Benchao Li
>>>>>>> School of Electronics Engineering and Computer Science, Peking 
>>>>>>> UniversityTel:+86-15650713730
>>>>>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>>>>>
>>>>>>>
>>>
>>> --
>>>
>>> Benchao Li
>>> School of Electronics Engineering and Computer Science, Peking 
>>> UniversityTel:+86-15650713730
>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>
>>>

Reply via email to