Hi Benchao,

Agreed a ConsoleSink is very useful but that is not the only problem here.
Documentation says use  tableEnv.registerTableSink all over the place
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink
however that function is deprecated. so how do I add any other Sink?

Thanks!





On Sat, Feb 29, 2020 at 6:05 PM Benchao Li <libenc...@gmail.com> wrote:

> Hi kant,
>
> AFAIK, there is no "print to stdout" sink for Table API now, you can
> implement one custom sink following this doc[1].
>
> IMO, an out-of-box print table sink is very useful, and I've created an
> issue[2] to track this.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink
> [2] https://issues.apache.org/jira/browse/FLINK-16354
>
> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 上午2:30写道:
>
>> Hi,
>>
>> Thanks for the pointer. Looks like the documentation says to use
>> tableEnv.registerTableSink however in my IDE it shows the method is
>> deprecated in Flink 1.10. so I am still not seeing a way to add a sink that
>> can print to stdout? what sink should I use to print to stdout and how do I
>> add it without converting into DataStream?
>>
>> Thanks!
>>
>> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski <pi...@ververica.com>
>> wrote:
>>
>>> Hi,
>>>
>>> You shouldn’t be using `KafkaTableSource` as it’s marked @Internal. It’s
>>> not part of any public API.
>>>
>>> You don’t have to convert DataStream into Table to read from Kafka in
>>> Table API. I guess you could, if you had used DataStream API’s
>>> FlinkKafkaConsumer as it’s documented here [1].
>>>
>>> But you should be able to use Kafka Table connector directly, as it is
>>> described in the docs [2][3].
>>>
>>> Piotrek
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>>>
>>> On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com> wrote:
>>>
>>> Also why do I need to convert to DataStream to print the rows of a
>>> table? Why not have a print method in the Table itself?
>>>
>>> On Sat, Feb 29, 2020 at 3:40 AM kant kodali <kanth...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> Do I need to use DataStream API or Table API to construct sources? I am
>>>> just trying to read from Kafka and print it to console. And yes I tried it
>>>> with datastreams and it works fine but I want to do it using Table related
>>>> APIs. I don't see any documentation or a sample on how to create Kafka
>>>> table source or any other source using Table Source API's so after some
>>>> digging I wrote the following code. My ultimate goal is to avoid Datastream
>>>> API as much as possible and just use Table API & SQL but somehow I feel the
>>>> Flink framework focuses on DataStream than the SQL interface. am I wrong?
>>>> From the user perspective wouldn't it make more sense to focus on SQL
>>>> interfaces for both streaming and batch?
>>>>
>>>>
>>>> import 
>>>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
>>>> import 
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>> import org.apache.flink.streaming.connectors.kafka.KafkaTableSource;
>>>> import org.apache.flink.table.api.DataTypes;
>>>> import org.apache.flink.table.api.EnvironmentSettings;
>>>> import org.apache.flink.table.api.Table;
>>>> import org.apache.flink.table.api.TableSchema;
>>>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>>> import org.apache.flink.table.sources.TableSource;
>>>> import org.apache.flink.types.Row;
>>>>
>>>> import java.io.IOException;
>>>> import java.util.Properties;
>>>>
>>>> public class Test {
>>>>
>>>>     public class MyDeserializationSchema extends 
>>>> AbstractDeserializationSchema<Row> {
>>>>         @Override
>>>>         public Row deserialize(byte[] message) throws IOException {
>>>>             return Row.of(new String(message));
>>>>         }
>>>>     }
>>>>
>>>>     public static void main(String... args) throws Exception {
>>>>         Test test = new Test();
>>>>         EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>>>                 .useBlinkPlanner()
>>>>                 .inStreamingMode()
>>>>                 .build();
>>>>
>>>>         StreamExecutionEnvironment streamExecutionEnvironment = 
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>         StreamTableEnvironment tableEnvironment = 
>>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings);
>>>>
>>>>         TableSource tableSource = test.getKafkaTableSource();
>>>>         Table kafkaTable = tableEnvironment.fromTableSource(tableSource);
>>>>         tableEnvironment.createTemporaryView("kafka_source", kafkaTable);
>>>>
>>>>         Table resultTable = tableEnvironment.sqlQuery("select * from 
>>>> kafka_source");
>>>>         tableEnvironment.toAppendStream(resultTable, Row.class).print();
>>>>
>>>>         streamExecutionEnvironment.execute("Sample Job");
>>>>     }
>>>>
>>>>     public KafkaTableSource getKafkaTableSource() {
>>>>         TableSchema tableSchema = TableSchema.builder().field("f0", 
>>>> DataTypes.STRING()).build();
>>>>         Properties properties = new Properties();
>>>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>>>         properties.setProperty("group.id", "test");
>>>>         return new KafkaTableSource(tableSchema, "test-topic1", 
>>>> properties, new MyDeserializationSchema());
>>>>     }
>>>> }
>>>>
>>>>
>>>> I get the following error
>>>>
>>>> The program finished with the following exception:
>>>>
>>>> The implementation of the FlinkKafkaConsumerBase is not serializable.
>>>> The object probably contains or references non serializable fields.
>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>>>>
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
>>>>
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584)
>>>>
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529)
>>>>
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511)
>>>>
>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165)
>>>>
>>>> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82)
>>>>
>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105)
>>>>
>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>>>>
>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>
>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>>>>
>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>>>>
>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
>>>>
>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>>
>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>
>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>>>>
>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>>>>
>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>>>>
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>>
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>> scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>>> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>>>
>>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>>>>
>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>>>>
>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>>>>
>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
>>>>
>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
>>>> Test.main(Test.java:40)
>>>>
>>>> The error seems to be on the line
>>>>
>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print();
>>>>
>>>> and I am not sure why it is not able to serialize?
>>>>
>>>> Thanks!
>>>>
>>>
>>>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>

Reply via email to