???? ????????
---????????--- ??????: "Zhenghua Gao"<doc...@gmail.com> ????????: 2019??3??8??(??????) ????10:57 ??????: "user-zh"<user-zh@flink.apache.org>; ????: Re: sql-client batch ???????????? ??Debug????????????Batch????????????????????????????CsvTableFactory?????? BatchCompatibleTableSinkFactory ?????? BatchTableSinkFactory?? /** * A CSV table factory. */ public class CsvTableFactory implements StreamTableSourceFactory<BaseRow>, BatchTableSourceFactory<BaseRow>, StreamTableSinkFactory<Object>, BatchCompatibleTableSinkFactory<Object> { ???? ExternalTableUtil.scala(line 111) ??????Java SPI ??????????Factory???????????????????????? BatchTableSinkFactory?? /** * Converts an [[CatalogTable]] instance to a [[TableSink]] instance * * @param name name of the table * @param externalTable the [[CatalogTable]] instance to convert * @param isStreaming is in streaming mode or not. * @return */ def toTableSink( name: String, externalTable: CatalogTable, isStreaming: Boolean): TableSink[_] = { val tableProperties: TableProperties = generateTableProperties(name, externalTable, isStreaming) if (isStreaming) { val tableFactory = TableFactoryService.find(classOf[StreamTableSinkFactory[_]], getToolDescriptor(getStorageType(name, tableProperties), tableProperties)) tableFactory.createStreamTableSink(tableProperties.toKeyLowerCase.toMap) } else { val tableFactory = TableFactoryService.find(classOf[BatchTableSinkFactory[_]], getToolDescriptor(getStorageType(name, tableProperties), tableProperties)) tableFactory.createBatchTableSink(tableProperties.toKeyLowerCase.toMap) } } ?????????????????????????? CsvTableFactory?????????????? NoMatchingTableFactoryException???????? ???????? BatchCompatibleTableSinkFactory ??????????????????connector??????(????????)????????????????????????????????????????????????????????????????????fix?? ???? TableFactoryUtil(line97-108)???????????????? BatchTableSinkFactory ?????? BatchCompatibleTableSinkFactory ??????????????????????????????????patch?? ????????????????????????NPE?????????????????????????????????????????????????????? On Thu, Mar 7, 2019 at 5:28 PM yuess_coder <642969...@qq.com> wrote: > ??????????????????????????????????????????????????????????debug???? > > > # Define tables here such as sources, sinks, views, or temporal tables. > > > tables: [] # empty list > > > # Define scalar, aggregate, or table functions here. > > > functions: [] # empty list > > > > > # Execution properties allow for changing the behavior of a table program. > > > execution: > # 'batch' or 'streaming' execution > type: batch > # allow 'event-time' or only 'processing-time' in sources > time-characteristic: event-time > # interval in ms for emitting periodic watermarks > periodic-watermarks-interval: 200 > # 'changelog' or 'table' presentation of results > result-mode: table > # maximum number of maintained rows in 'table' presentation of results > max-table-result-rows: 1000000 > # parallelism of the program > parallelism: 1 > # maximum parallelism > max-parallelism: 128 > # minimum idle state retention in ms > min-idle-state-retention: 0 > # maximum idle state retention in ms > max-idle-state-retention: 0 > # controls how table programs are restarted in case of a failures > restart-strategy: > # strategy type > # possible values are "fixed-delay", "failure-rate", "none", or > "fallback" (default) > type: fallback > > > > > > #============================================================================== > # Deployment properties > > #============================================================================== > > > # Deployment properties allow for describing the cluster to which table > # programs are submitted to. > > > deployment: > # general cluster communication timeout in ms > response-timeout: 5000 > # (optional) address from cluster to gateway > gateway-address: "" > # (optional) port from cluster to gateway > gateway-port: 0 > > > > > > #============================================================================== > # Catalog properties > > #============================================================================== > #catalogs: > # - name: myhive > # catalog: > # type: hive > # connector: > # hive.metastore.uris: thrift://localhost:9083 > # is-default: false > # default-database: default > > > > ------------------ ???????? ------------------ > ??????: "Zhenghua Gao"<doc...@gmail.com>; > ????????: 2019??3??7??(??????) ????3:13 > ??????: "user-zh"<user-zh@flink.apache.org>; > > ????: Re: sql-client batch ???????????? > > > > ?????? tableSink ????????????????null????????debug?????? > ???????????????? conf/sql-client-defaults.yaml ???? > > On Thu, Mar 7, 2019 at 1:31 PM yuess_coder <642969...@qq.com> wrote: > > > ?????????? > > > > > > > > > > java.lang.NullPointerException > > at > > > org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:1300) > > at > > > org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:1203) > > at > > > org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:1235) > > at > > > org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$applyUpdate$10(LocalExecutor.java:605) > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:181) > > at > > > org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:603) > > at > > > org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:508) > > at > > > org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:342) > > at > > > org.apache.flink.table.client.cli.CliClient.callInsertInto(CliClient.java:483) > > at > > > org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:286) > > at java.util.Optional.ifPresent(Optional.java:159) > > at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:174) > > at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) > > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:107) > > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:202) > > > > > > > > > > > > val sinkFieldTypes = tableSink > > .getFieldTypes > > .map(_.toInternalType) > > > > ????????null???????????? > > > > > > ------------------ ???????? ------------------ > > ??????: "Kurt Young"<ykt...@gmail.com>; > > ????????: 2019??3??7??(??????) ????12:06 > > ??????: "user-zh"<user-zh@flink.apache.org>; > > > > ????: Re: sql-client batch ???????????? > > > > > > > > ?????????????????????????????? > > > > Best, > > Kurt > > > > > > On Thu, Mar 7, 2019 at 9:23 AM yuess_coder <642969...@qq.com> wrote: > > > > > ????sql-client?????????? > > > > > > create table csv_source1( > > > id varchar, > > > name varchar > > > ) with ( > > > type ='csv', > > > path = > > '/Users/IdeaProjects/github/apache-flink/build-target/bin/test1.csv' > > > ); > > > > > > > > > create table csv_sink( > > > id varchar, > > > name varchar > > > ) with ( > > > type ='csv', > > > path = > > '/Users/IdeaProjects/github/apache-flink/build-target/bin/test4.csv' > > > ); > > > > > > insert into csv_sink select t1.name,t1.id from csv_source1 t1 > > > > > > > > > ??????org.apache.flink.table.api.TableEnvironment??????1300????????????execution > > > batch????????????execution > > > streaming????????????????????????batch????????????sql? > > > > > > > > >