------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: findAndCreateTableSource failed.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: org.apache.flink.table.api.TableException:
findAndCreateTableSource failed.
at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)
at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
at Test.main(Test.java:34)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 8 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'

The following properties are requested:
connector.property-version=1
connector.topic=test-topic1
connector.type=kafka
connector.version=0.11
format.property-version=1
format.type=csv
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=f0
update-mode=append

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
at
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
... 34 more

On Sat, Feb 29, 2020 at 11:35 PM Piotr Nowojski <pi...@ververica.com> wrote:

> Hi,
>
> > Thanks for the pointer. Looks like the documentation says to use
> tableEnv.registerTableSink however in my IDE it shows the method is
> deprecated in Flink 1.10.
>
> It looks like not all of the documentation was updated after methods were
> deprecated. However if you look at the java docs of the `registerTableSink`
> method, you can find an answer [1].
>
> >  It sill doesnt work because it says for CSV the connector.type should
> be filesystem not Kafka.
>
> Can you post the full stack trace? As I’m not familiar with the Table API,
> maybe you Timo or Dawid know what’s going on here?
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/api/TableEnvironment.html#registerTableSink-java.lang.String-org.apache.flink.table.sinks.TableSink-
>
> On 1 Mar 2020, at 07:50, kant kodali <kanth...@gmail.com> wrote:
>
> Here is my updated code after digging through the source code (not sure if
> it is correct ). It sill doesnt work because it says for CSV the
> connector.type should be filesystem not Kafka but documentation says it is
> supported.
>
>
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.runtime.state.StateBackend;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.DataTypes;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.descriptors.Csv;
> import org.apache.flink.table.descriptors.Kafka;
> import org.apache.flink.table.descriptors.Schema;
>
> public class Test {
>
>     public static void main(String... args) throws Exception {
>         EnvironmentSettings settings = EnvironmentSettings.newInstance()
>                 .useBlinkPlanner()
>                 .inStreamingMode()
>                 .build();
>
>         StreamExecutionEnvironment streamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         streamExecutionEnvironment.setStateBackend((StateBackend) new 
> RocksDBStateBackend("file:///tmp/rocksdb"));
>         StreamTableEnvironment tableEnvironment = 
> StreamTableEnvironment.create(streamExecutionEnvironment, settings);
>
>         tableEnvironment
>             .connect(
>                 new Kafka()
>                     .version("0.11")
>                     .topic("test-topic1")
>             )
>             .withFormat(new Csv())
>             .withSchema(new Schema().field("f0", DataTypes.STRING()))
>             .inAppendMode()
>             .createTemporaryTable("kafka_source");
>
>         Table resultTable = tableEnvironment.sqlQuery("select * from 
> kafka_source");
>
>         tableEnvironment
>             .connect(
>                 new Kafka()
>                     .version("0.11")
>                     .topic("test-topic2")
>             )
>             .withFormat(new Csv())
>             .withSchema(new Schema().field("f0", DataTypes.STRING()))
>             .inAppendMode()
>             .createTemporaryTable("kafka_target");
>
>         tableEnvironment.insertInto("kafka_target", resultTable);
>
>         tableEnvironment.execute("Sample Job");
>     }
> }
>
>
> On Sat, Feb 29, 2020 at 7:48 PM kant kodali <kanth...@gmail.com> wrote:
>
>> Hi Benchao,
>>
>> Agreed a ConsoleSink is very useful but that is not the only problem
>> here. Documentation says use  tableEnv.registerTableSink all over the place
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink
>> however that function is deprecated. so how do I add any other Sink?
>>
>> Thanks!
>>
>>
>>
>>
>>
>> On Sat, Feb 29, 2020 at 6:05 PM Benchao Li <libenc...@gmail.com> wrote:
>>
>>> Hi kant,
>>>
>>> AFAIK, there is no "print to stdout" sink for Table API now, you can
>>> implement one custom sink following this doc[1].
>>>
>>> IMO, an out-of-box print table sink is very useful, and I've created an
>>> issue[2] to track this.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink
>>> [2] https://issues.apache.org/jira/browse/FLINK-16354
>>>
>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 上午2:30写道:
>>>
>>>> Hi,
>>>>
>>>> Thanks for the pointer. Looks like the documentation says to use
>>>> tableEnv.registerTableSink however in my IDE it shows the method is
>>>> deprecated in Flink 1.10. so I am still not seeing a way to add a sink that
>>>> can print to stdout? what sink should I use to print to stdout and how do I
>>>> add it without converting into DataStream?
>>>>
>>>> Thanks!
>>>>
>>>> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski <pi...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> You shouldn’t be using `KafkaTableSource` as it’s marked @Internal.
>>>>> It’s not part of any public API.
>>>>>
>>>>> You don’t have to convert DataStream into Table to read from Kafka in
>>>>> Table API. I guess you could, if you had used DataStream API’s
>>>>> FlinkKafkaConsumer as it’s documented here [1].
>>>>>
>>>>> But you should be able to use Kafka Table connector directly, as it is
>>>>> described in the docs [2][3].
>>>>>
>>>>> Piotrek
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview
>>>>> [3]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>>>>>
>>>>> On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com> wrote:
>>>>>
>>>>> Also why do I need to convert to DataStream to print the rows of a
>>>>> table? Why not have a print method in the Table itself?
>>>>>
>>>>> On Sat, Feb 29, 2020 at 3:40 AM kant kodali <kanth...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> Do I need to use DataStream API or Table API to construct sources? I
>>>>>> am just trying to read from Kafka and print it to console. And yes I 
>>>>>> tried
>>>>>> it with datastreams and it works fine but I want to do it using Table
>>>>>> related APIs. I don't see any documentation or a sample on how to create
>>>>>> Kafka table source or any other source using Table Source API's so after
>>>>>> some digging I wrote the following code. My ultimate goal is to avoid
>>>>>> Datastream API as much as possible and just use Table API & SQL but 
>>>>>> somehow
>>>>>> I feel the Flink framework focuses on DataStream than the SQL interface. 
>>>>>> am
>>>>>> I wrong? From the user perspective wouldn't it make more sense to focus 
>>>>>> on
>>>>>> SQL interfaces for both streaming and batch?
>>>>>>
>>>>>>
>>>>>> import 
>>>>>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
>>>>>> import 
>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>>> import org.apache.flink.streaming.connectors.kafka.KafkaTableSource;
>>>>>> import org.apache.flink.table.api.DataTypes;
>>>>>> import org.apache.flink.table.api.EnvironmentSettings;
>>>>>> import org.apache.flink.table.api.Table;
>>>>>> import org.apache.flink.table.api.TableSchema;
>>>>>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>>>>> import org.apache.flink.table.sources.TableSource;
>>>>>> import org.apache.flink.types.Row;
>>>>>>
>>>>>> import java.io.IOException;
>>>>>> import java.util.Properties;
>>>>>>
>>>>>> public class Test {
>>>>>>
>>>>>>     public class MyDeserializationSchema extends 
>>>>>> AbstractDeserializationSchema<Row> {
>>>>>>         @Override
>>>>>>         public Row deserialize(byte[] message) throws IOException {
>>>>>>             return Row.of(new String(message));
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>>     public static void main(String... args) throws Exception {
>>>>>>         Test test = new Test();
>>>>>>         EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>>>>>                 .useBlinkPlanner()
>>>>>>                 .inStreamingMode()
>>>>>>                 .build();
>>>>>>
>>>>>>         StreamExecutionEnvironment streamExecutionEnvironment = 
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>         StreamTableEnvironment tableEnvironment = 
>>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings);
>>>>>>
>>>>>>         TableSource tableSource = test.getKafkaTableSource();
>>>>>>         Table kafkaTable = tableEnvironment.fromTableSource(tableSource);
>>>>>>         tableEnvironment.createTemporaryView("kafka_source", kafkaTable);
>>>>>>
>>>>>>         Table resultTable = tableEnvironment.sqlQuery("select * from 
>>>>>> kafka_source");
>>>>>>         tableEnvironment.toAppendStream(resultTable, Row.class).print();
>>>>>>
>>>>>>         streamExecutionEnvironment.execute("Sample Job");
>>>>>>     }
>>>>>>
>>>>>>     public KafkaTableSource getKafkaTableSource() {
>>>>>>         TableSchema tableSchema = TableSchema.builder().field("f0", 
>>>>>> DataTypes.STRING()).build();
>>>>>>         Properties properties = new Properties();
>>>>>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>>>>>         properties.setProperty("group.id", "test");
>>>>>>         return new KafkaTableSource(tableSchema, "test-topic1", 
>>>>>> properties, new MyDeserializationSchema());
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>>
>>>>>> I get the following error
>>>>>>
>>>>>> The program finished with the following exception:
>>>>>>
>>>>>> The implementation of the FlinkKafkaConsumerBase is not serializable.
>>>>>> The object probably contains or references non serializable fields.
>>>>>>
>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>>>>>>
>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
>>>>>>
>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584)
>>>>>>
>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529)
>>>>>>
>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511)
>>>>>>
>>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165)
>>>>>>
>>>>>> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82)
>>>>>>
>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105)
>>>>>>
>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>>>>>>
>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>
>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>>>>>>
>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>>>>>>
>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
>>>>>>
>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>>>>
>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>
>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>>>>>>
>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>>>>>>
>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>>>>>>
>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>>>>
>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>>>> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>>>>> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>>>>>
>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>>>>>>
>>>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>>>>>>
>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>>>>>>
>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
>>>>>>
>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
>>>>>> Test.main(Test.java:40)
>>>>>>
>>>>>> The error seems to be on the line
>>>>>>
>>>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print();
>>>>>>
>>>>>> and I am not sure why it is not able to serialize?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>
>>>>>
>>>
>>> --
>>>
>>> Benchao Li
>>> School of Electronics Engineering and Computer Science, Peking University
>>> Tel:+86-15650713730
>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>
>>>
>

Reply via email to