Hi,

> Thanks for the pointer. Looks like the documentation says to use 
> tableEnv.registerTableSink however in my IDE it shows the method is 
> deprecated in Flink 1.10. 

It looks like not all of the documentation was updated after methods were 
deprecated. However if you look at the java docs of the `registerTableSink` 
method, you can find an answer [1]. 

>  It sill doesnt work because it says for CSV the connector.type should be 
> filesystem not Kafka.

Can you post the full stack trace? As I’m not familiar with the Table API, 
maybe you Timo or Dawid know what’s going on here?

Piotrek

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/api/TableEnvironment.html#registerTableSink-java.lang.String-org.apache.flink.table.sinks.TableSink-
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/api/TableEnvironment.html#registerTableSink-java.lang.String-org.apache.flink.table.sinks.TableSink->

> On 1 Mar 2020, at 07:50, kant kodali <kanth...@gmail.com> wrote:
> 
> Here is my updated code after digging through the source code (not sure if it 
> is correct ). It sill doesnt work because it says for CSV the connector.type 
> should be filesystem not Kafka but documentation says it is supported.
> 
> 
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.runtime.state.StateBackend;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.DataTypes;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.descriptors.Csv;
> import org.apache.flink.table.descriptors.Kafka;
> import org.apache.flink.table.descriptors.Schema;
> 
> public class Test {
> 
>     public static void main(String... args) throws Exception {
>         EnvironmentSettings settings = EnvironmentSettings.newInstance()
>                 .useBlinkPlanner()
>                 .inStreamingMode()
>                 .build();
> 
>         StreamExecutionEnvironment streamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         streamExecutionEnvironment.setStateBackend((StateBackend) new 
> RocksDBStateBackend("file:///tmp/rocksdb"));
>         StreamTableEnvironment tableEnvironment = 
> StreamTableEnvironment.create(streamExecutionEnvironment, settings);
> 
>         tableEnvironment
>             .connect(
>                 new Kafka()
>                     .version("0.11")
>                     .topic("test-topic1")
>             )
>             .withFormat(new Csv())
>             .withSchema(new Schema().field("f0", DataTypes.STRING()))
>             .inAppendMode()
>             .createTemporaryTable("kafka_source");
> 
>         Table resultTable = tableEnvironment.sqlQuery("select * from 
> kafka_source");
> 
>         tableEnvironment
>             .connect(
>                 new Kafka()
>                     .version("0.11")
>                     .topic("test-topic2")
>             )
>             .withFormat(new Csv())
>             .withSchema(new Schema().field("f0", DataTypes.STRING()))
>             .inAppendMode()
>             .createTemporaryTable("kafka_target");
> 
>         tableEnvironment.insertInto("kafka_target", resultTable);
> 
>         tableEnvironment.execute("Sample Job");
>     }
> }
> 
> On Sat, Feb 29, 2020 at 7:48 PM kant kodali <kanth...@gmail.com 
> <mailto:kanth...@gmail.com>> wrote:
> Hi Benchao,
> 
> Agreed a ConsoleSink is very useful but that is not the only problem here. 
> Documentation says use  tableEnv.registerTableSink all over the place 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink>
>  however that function is deprecated. so how do I add any other Sink?
> 
> Thanks!
> 
> 
> 
> 
> 
> On Sat, Feb 29, 2020 at 6:05 PM Benchao Li <libenc...@gmail.com 
> <mailto:libenc...@gmail.com>> wrote:
> Hi kant,
> 
> AFAIK, there is no "print to stdout" sink for Table API now, you can 
> implement one custom sink following this doc[1].
> 
> IMO, an out-of-box print table sink is very useful, and I've created an 
> issue[2] to track this.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink>
> [2] https://issues.apache.org/jira/browse/FLINK-16354 
> <https://issues.apache.org/jira/browse/FLINK-16354>
> kant kodali <kanth...@gmail.com <mailto:kanth...@gmail.com>> 于2020年3月1日周日 
> 上午2:30写道:
> Hi, 
> 
> Thanks for the pointer. Looks like the documentation says to use 
> tableEnv.registerTableSink however in my IDE it shows the method is 
> deprecated in Flink 1.10. so I am still not seeing a way to add a sink that 
> can print to stdout? what sink should I use to print to stdout and how do I 
> add it without converting into DataStream?
> 
> Thanks!
> 
> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski <pi...@ververica.com 
> <mailto:pi...@ververica.com>> wrote:
> Hi,
> 
> You shouldn’t be using `KafkaTableSource` as it’s marked @Internal. It’s not 
> part of any public API.
> 
> You don’t have to convert DataStream into Table to read from Kafka in Table 
> API. I guess you could, if you had used DataStream API’s FlinkKafkaConsumer 
> as it’s documented here [1].
> 
> But you should be able to use Kafka Table connector directly, as it is 
> described in the docs [2][3].
> 
> Piotrek
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html>
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview>
> [3] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector>
> 
>> On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com 
>> <mailto:kanth...@gmail.com>> wrote:
>> 
>> Also why do I need to convert to DataStream to print the rows of a table? 
>> Why not have a print method in the Table itself?
>> 
>> On Sat, Feb 29, 2020 at 3:40 AM kant kodali <kanth...@gmail.com 
>> <mailto:kanth...@gmail.com>> wrote:
>> Hi All,
>> 
>> Do I need to use DataStream API or Table API to construct sources? I am just 
>> trying to read from Kafka and print it to console. And yes I tried it with 
>> datastreams and it works fine but I want to do it using Table related APIs. 
>> I don't see any documentation or a sample on how to create Kafka table 
>> source or any other source using Table Source API's so after some digging I 
>> wrote the following code. My ultimate goal is to avoid Datastream API as 
>> much as possible and just use Table API & SQL but somehow I feel the Flink 
>> framework focuses on DataStream than the SQL interface. am I wrong? From the 
>> user perspective wouldn't it make more sense to focus on SQL interfaces for 
>> both streaming and batch?
>> 
>> 
>> import 
>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.streaming.connectors.kafka.KafkaTableSource;
>> import org.apache.flink.table.api.DataTypes;
>> import org.apache.flink.table.api.EnvironmentSettings;
>> import org.apache.flink.table.api.Table;
>> import org.apache.flink.table.api.TableSchema;
>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>> import org.apache.flink.table.sources.TableSource;
>> import org.apache.flink.types.Row;
>> 
>> import java.io.IOException;
>> import java.util.Properties;
>> 
>> public class Test {
>> 
>>     public class MyDeserializationSchema extends 
>> AbstractDeserializationSchema<Row> {
>>         @Override
>>         public Row deserialize(byte[] message) throws IOException {
>>             return Row.of(new String(message));
>>         }
>>     }
>> 
>>     public static void main(String... args) throws Exception {
>>         Test test = new Test();
>>         EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>                 .useBlinkPlanner()
>>                 .inStreamingMode()
>>                 .build();
>> 
>>         StreamExecutionEnvironment streamExecutionEnvironment = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         StreamTableEnvironment tableEnvironment = 
>> StreamTableEnvironment.create(streamExecutionEnvironment, settings);
>> 
>>         TableSource tableSource = test.getKafkaTableSource();
>>         Table kafkaTable = tableEnvironment.fromTableSource(tableSource);
>>         tableEnvironment.createTemporaryView("kafka_source", kafkaTable);
>> 
>>         Table resultTable = tableEnvironment.sqlQuery("select * from 
>> kafka_source");
>>         tableEnvironment.toAppendStream(resultTable, Row.class).print();
>> 
>>         streamExecutionEnvironment.execute("Sample Job");
>>     }
>> 
>>     public KafkaTableSource getKafkaTableSource() {
>>         TableSchema tableSchema = TableSchema.builder().field("f0", 
>> DataTypes.STRING()).build();
>>         Properties properties = new Properties();
>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>         properties.setProperty("group.id <http://group.id/>", "test");
>>         return new KafkaTableSource(tableSchema, "test-topic1", properties, 
>> new MyDeserializationSchema());
>>     }
>> }
>> 
>> I get the following error 
>> 
>> The program finished with the following exception:
>> 
>> The implementation of the FlinkKafkaConsumerBase is not serializable. The 
>> object probably contains or references non serializable fields.
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584)
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529)
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511)
>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165)
>> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82)
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105)
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
>> Test.main(Test.java:40)
>> 
>> The error seems to be on the line
>> tableEnvironment.toAppendStream(resultTable, Row.class).print();
>> and I am not sure why it is not able to serialize?
>> 
>> Thanks!
> 
> 
> 
> -- 
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com <mailto:libenc...@gmail.com>; libenc...@pku.edu.cn 
> <mailto:libenc...@pku.edu.cn>

Reply via email to