??????sql-client batch ????????????
------ ??: "Zhenghua Gao" : 2019??3??8??(??) 10:57 ??: "user-zh"; : Re: sql-client batch ??DebugBatchCsvTableFactory?? BatchCompatibleTableSinkFactory ?? BatchTableSinkFactory?? /** * A CSV table factory. */ public class CsvTableFactory implements StreamTableSourceFactory, BatchTableSourceFactory, StreamTableSinkFactory, BatchCompatibleTableSinkFactory { ExternalTableUtil.scala(line 111) ??Java SPI ??Factory BatchTableSinkFactory?? /** * Converts an [[CatalogTable]] instance to a [[TableSink]] instance * * @param name name of the table * @param externalTable the [[CatalogTable]] instance to convert * @param isStreaming is in streaming mode or not. * @return */ def toTableSink( name: String, externalTable: CatalogTable, isStreaming: Boolean): TableSink[_] = { val tableProperties: TableProperties = generateTableProperties(name, externalTable, isStreaming) if (isStreaming) { val tableFactory = TableFactoryService.find(classOf[StreamTableSinkFactory[_]], getToolDescriptor(getStorageType(name, tableProperties), tableProperties)) tableFactory.createStreamTableSink(tableProperties.toKeyLowerCase.toMap) } else { val tableFactory = TableFactoryService.find(classOf[BatchTableSinkFactory[_]], getToolDescriptor(getStorageType(name, tableProperties), tableProperties)) tableFactory.createBatchTableSink(tableProperties.toKeyLowerCase.toMap) } } ?? CsvTableFactory?? NoMatchingTableFactoryException BatchCompatibleTableSinkFactory ??connector??()fix?? TableFactoryUtil(line97-108) BatchTableSinkFactory ?? BatchCompatibleTableSinkFactory ??patch?? NPE?? On Thu, Mar 7, 2019 at 5:28 PM yuess_coder <642969...@qq.com> wrote: > ??debug > > > # Define tables here such as sources, sinks, views, or temporal tables. > > > tables: [] # empty list > > > # Define scalar, aggregate, or table functions here. > > > functions: [] # empty list > > > > > # Execution properties allow for changing the behavior of a table program. > > > execution: > # 'batch' or 'streaming' execution > type: batch > # allow 'event-time' or only 'processing-time' in sources > time-characteristic: event-time > # interval in ms for emitting periodic watermarks > periodic-watermarks-interval: 200 > # 'changelog' or 'table' presentation of results > result-mode: table > # maximum number of maintained rows in 'table' presentation of results > max-table-result-rows: 100 > # parallelism of the program > parallelism: 1 > # maximum parallelism > max-parallelism: 128 > # minimum idle state retention in ms > min-idle-state-retention: 0 > # maximum idle state retention in ms > max-idle-state-retention: 0 > # controls how table programs are restarted in case of a failures > restart-strategy: > # strategy type > # possible values are "fixed-delay", "failure-rate", "none", or > "fallback" (default) > type: fallback > > > > > > #== > # Deployment properties > > #== > > > # Deployment properties allow for describing the cluster to which table > # programs are submitted to. > > > deployment: > # general cluster communication timeout in ms > response-timeout: 5000 > # (optional) address from cluster to gateway > gateway-address: "" > # (optional) port from cluster to gateway > gateway-port: 0 > > > > > > #== > # Catalog properties > > #== > #catalogs: > # - name: myhive > #catalog: > # type: hive > # connector: > #hive.metastore.uris: thrift://localhost:9083 > # is-default: false > # default-database: default > > > > -- -- > ??: "Zhenghua Gao"; > : 2019??3??7??(??) 3:13 > ??: "user-zh"; > > : Re: sql-client b
?????? sql-client batch ????????????
??debug # Define tables here such as sources, sinks, views, or temporal tables. tables: [] # empty list # Define scalar, aggregate, or table functions here. functions: [] # empty list # Execution properties allow for changing the behavior of a table program. execution: # 'batch' or 'streaming' execution type: batch # allow 'event-time' or only 'processing-time' in sources time-characteristic: event-time # interval in ms for emitting periodic watermarks periodic-watermarks-interval: 200 # 'changelog' or 'table' presentation of results result-mode: table # maximum number of maintained rows in 'table' presentation of results max-table-result-rows: 100 # parallelism of the program parallelism: 1 # maximum parallelism max-parallelism: 128 # minimum idle state retention in ms min-idle-state-retention: 0 # maximum idle state retention in ms max-idle-state-retention: 0 # controls how table programs are restarted in case of a failures restart-strategy: # strategy type # possible values are "fixed-delay", "failure-rate", "none", or "fallback" (default) type: fallback #== # Deployment properties #== # Deployment properties allow for describing the cluster to which table # programs are submitted to. deployment: # general cluster communication timeout in ms response-timeout: 5000 # (optional) address from cluster to gateway gateway-address: "" # (optional) port from cluster to gateway gateway-port: 0 #== # Catalog properties #== #catalogs: # - name: myhive #catalog: # type: hive # connector: #hive.metastore.uris: thrift://localhost:9083 # is-default: false # default-database: default -- -- ??: "Zhenghua Gao"; : 2019??3??7??(??) 3:13 ??: "user-zh"; : Re: sql-client batch ?? tableSink nulldebug?? ???? conf/sql-client-defaults.yaml On Thu, Mar 7, 2019 at 1:31 PM yuess_coder <642969...@qq.com> wrote: > ?? > > > > > java.lang.NullPointerException > at > org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:1300) > at > org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:1203) > at > org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:1235) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$applyUpdate$10(LocalExecutor.java:605) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:181) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:603) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:508) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:342) > at > org.apache.flink.table.client.cli.CliClient.callInsertInto(CliClient.java:483) > at > org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:286) > at java.util.Optional.ifPresent(Optional.java:159) > at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:174) > at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:107) > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:202) > > > > > > val sinkFieldTypes = tableSink > .getFieldTypes > .map(_.toInternalType) > > null > > > -- -- > ??: "Kurt Young"; > : 2019??3??7??(??) 12:06 > ??: "user-zh"; > > : Re: sql-client batch > > > > ?? > > Best, > Kurt > > > On Thu, Mar 7, 2019 at 9:23 AM yuess_coder <642969...@qq.com> wrote: > > > sql-client?? > > > > create table csv_source1( > > id varchar, > > name varchar > > ) with ( > > type ='csv', > > path = > '/Users/IdeaProjects/github/apache-flink/build-target/bin/test1.csv' > > ); > > > > > > create table csv_sink( > > id varchar, > > name varchar > > ) with ( > > type ='csv', > > path = > '/Users/IdeaProjects/github/apache-flink/build-target/bin/test4.csv' > > ); > > > > insert into csv_sink select t1.name,t1.id from csv_source1 t1 > > > > > > ??org.apache.flink.table.api.TableEnvironment??1300execution > > batchexecution > > streamingbatchsql? > > > > > >
sql-client batch 模式执行报错
我在sql-client提交任务: create table csv_source1( id varchar, name varchar ) with ( type ='csv', path = '/Users/IdeaProjects/github/apache-flink/build-target/bin/test1.csv' ); create table csv_sink( id varchar, name varchar ) with ( type ='csv', path = '/Users/IdeaProjects/github/apache-flink/build-target/bin/test4.csv' ); insert into csv_sink select t1.name,t1.id from csv_source1 t1 错误是org.apache.flink.table.api.TableEnvironment这个类1300行空指针,用execution batch模式不行,用execution streaming模式是可以的。请问下才能batch模式执行这个sql?