Hi All,

Do I need to use DataStream API or Table API to construct sources? I am
just trying to read from Kafka and print it to console. And yes I tried it
with datastreams and it works fine but I want to do it using Table related
APIs. I don't see any documentation or a sample on how to create Kafka
table source or any other source using Table Source API's so after some
digging I wrote the following code. My ultimate goal is to avoid Datastream
API as much as possible and just use Table API & SQL but somehow I feel the
Flink framework focuses on DataStream than the SQL interface. am I wrong?
>From the user perspective wouldn't it make more sense to focus on SQL
interfaces for both streaming and batch?


import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.KafkaTableSource;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.types.Row;

import java.io.IOException;
import java.util.Properties;

public class Test {

    public class MyDeserializationSchema extends
AbstractDeserializationSchema<Row> {
        @Override
        public Row deserialize(byte[] message) throws IOException {
            return Row.of(new String(message));
        }
    }

    public static void main(String... args) throws Exception {
        Test test = new Test();
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamExecutionEnvironment streamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(streamExecutionEnvironment, settings);

        TableSource tableSource = test.getKafkaTableSource();
        Table kafkaTable = tableEnvironment.fromTableSource(tableSource);
        tableEnvironment.createTemporaryView("kafka_source", kafkaTable);

        Table resultTable = tableEnvironment.sqlQuery("select * from
kafka_source");
        tableEnvironment.toAppendStream(resultTable, Row.class).print();

        streamExecutionEnvironment.execute("Sample Job");
    }

    public KafkaTableSource getKafkaTableSource() {
        TableSchema tableSchema = TableSchema.builder().field("f0",
DataTypes.STRING()).build();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test");
        return new KafkaTableSource(tableSchema, "test-topic1",
properties, new MyDeserializationSchema());
    }
}


I get the following error

The program finished with the following exception:

The implementation of the FlinkKafkaConsumerBase is not serializable. The
object probably contains or references non serializable fields.
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511)
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165)
org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82)
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105)
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
scala.collection.Iterator$class.foreach(Iterator.scala:891)
scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
scala.collection.AbstractTraversable.map(Traversable.scala:104)
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
Test.main(Test.java:40)

The error seems to be on the line

tableEnvironment.toAppendStream(resultTable, Row.class).print();

and I am not sure why it is not able to serialize?

Thanks!

Reply via email to