Hi,

You shouldn’t be using `KafkaTableSource` as it’s marked @Internal. It’s not 
part of any public API.

You don’t have to convert DataStream into Table to read from Kafka in Table 
API. I guess you could, if you had used DataStream API’s FlinkKafkaConsumer as 
it’s documented here [1].

But you should be able to use Kafka Table connector directly, as it is 
described in the docs [2][3].

Piotrek

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
 
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html>
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview>
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector>

> On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com> wrote:
> 
> Also why do I need to convert to DataStream to print the rows of a table? Why 
> not have a print method in the Table itself?
> 
> On Sat, Feb 29, 2020 at 3:40 AM kant kodali <kanth...@gmail.com 
> <mailto:kanth...@gmail.com>> wrote:
> Hi All,
> 
> Do I need to use DataStream API or Table API to construct sources? I am just 
> trying to read from Kafka and print it to console. And yes I tried it with 
> datastreams and it works fine but I want to do it using Table related APIs. I 
> don't see any documentation or a sample on how to create Kafka table source 
> or any other source using Table Source API's so after some digging I wrote 
> the following code. My ultimate goal is to avoid Datastream API as much as 
> possible and just use Table API & SQL but somehow I feel the Flink framework 
> focuses on DataStream than the SQL interface. am I wrong? From the user 
> perspective wouldn't it make more sense to focus on SQL interfaces for both 
> streaming and batch?
> 
> 
> import 
> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.kafka.KafkaTableSource;
> import org.apache.flink.table.api.DataTypes;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableSchema;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.sources.TableSource;
> import org.apache.flink.types.Row;
> 
> import java.io.IOException;
> import java.util.Properties;
> 
> public class Test {
> 
>     public class MyDeserializationSchema extends 
> AbstractDeserializationSchema<Row> {
>         @Override
>         public Row deserialize(byte[] message) throws IOException {
>             return Row.of(new String(message));
>         }
>     }
> 
>     public static void main(String... args) throws Exception {
>         Test test = new Test();
>         EnvironmentSettings settings = EnvironmentSettings.newInstance()
>                 .useBlinkPlanner()
>                 .inStreamingMode()
>                 .build();
> 
>         StreamExecutionEnvironment streamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         StreamTableEnvironment tableEnvironment = 
> StreamTableEnvironment.create(streamExecutionEnvironment, settings);
> 
>         TableSource tableSource = test.getKafkaTableSource();
>         Table kafkaTable = tableEnvironment.fromTableSource(tableSource);
>         tableEnvironment.createTemporaryView("kafka_source", kafkaTable);
> 
>         Table resultTable = tableEnvironment.sqlQuery("select * from 
> kafka_source");
>         tableEnvironment.toAppendStream(resultTable, Row.class).print();
> 
>         streamExecutionEnvironment.execute("Sample Job");
>     }
> 
>     public KafkaTableSource getKafkaTableSource() {
>         TableSchema tableSchema = TableSchema.builder().field("f0", 
> DataTypes.STRING()).build();
>         Properties properties = new Properties();
>         properties.setProperty("bootstrap.servers", "localhost:9092");
>         properties.setProperty("group.id <http://group.id/>", "test");
>         return new KafkaTableSource(tableSchema, "test-topic1", properties, 
> new MyDeserializationSchema());
>     }
> }
> 
> I get the following error 
> 
> The program finished with the following exception:
> 
> The implementation of the FlinkKafkaConsumerBase is not serializable. The 
> object probably contains or references non serializable fields.
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584)
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529)
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511)
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165)
> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82)
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105)
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> scala.collection.Iterator$class.foreach(Iterator.scala:891)
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
> Test.main(Test.java:40)
> 
> The error seems to be on the line
> tableEnvironment.toAppendStream(resultTable, Row.class).print();
> and I am not sure why it is not able to serialize?
> 
> Thanks!

Reply via email to