???? ????????

 
---????????---
??????: "Zhenghua Gao"<doc...@gmail.com>
????????: 2019??3??8??(??????) ????10:57
??????: "user-zh"<user-zh@flink.apache.org>;
????: Re: sql-client batch ????????????


??Debug????????????Batch????????????????????????????CsvTableFactory??????
BatchCompatibleTableSinkFactory ?????? BatchTableSinkFactory??

/**
 * A CSV table factory.
 */
public class CsvTableFactory implements
   StreamTableSourceFactory<BaseRow>,
   BatchTableSourceFactory<BaseRow>,
   StreamTableSinkFactory<Object>,
   BatchCompatibleTableSinkFactory<Object> {

???? ExternalTableUtil.scala(line 111) ??????Java SPI 
??????????Factory????????????????????????
BatchTableSinkFactory??

/**
  * Converts an [[CatalogTable]] instance to a [[TableSink]] instance
  *
  * @param name          name of the table
  * @param externalTable the [[CatalogTable]] instance to convert
  * @param isStreaming   is in streaming mode or not.
  * @return
  */
def toTableSink(
                 name: String,
                 externalTable: CatalogTable,
                 isStreaming: Boolean): TableSink[_] = {

  val tableProperties: TableProperties = generateTableProperties(name,
externalTable, isStreaming)
  if (isStreaming) {
    val tableFactory =
TableFactoryService.find(classOf[StreamTableSinkFactory[_]],
      getToolDescriptor(getStorageType(name, tableProperties), tableProperties))
    tableFactory.createStreamTableSink(tableProperties.toKeyLowerCase.toMap)
  } else {
    val tableFactory =
TableFactoryService.find(classOf[BatchTableSinkFactory[_]],
      getToolDescriptor(getStorageType(name, tableProperties), tableProperties))
    tableFactory.createBatchTableSink(tableProperties.toKeyLowerCase.toMap)
  }
}

?????????????????????????? CsvTableFactory?????????????? 
NoMatchingTableFactoryException????????

???????? BatchCompatibleTableSinkFactory
??????????????????connector??????(????????)????????????????????????????????????????????????????????????????????fix??
 ????
TableFactoryUtil(line97-108)???????????????? BatchTableSinkFactory ??????
BatchCompatibleTableSinkFactory ??????????????????????????????????patch??

????????????????????????NPE??????????????????????????????????????????????????????

On Thu, Mar 7, 2019 at 5:28 PM yuess_coder <642969...@qq.com> wrote:

> ??????????????????????????????????????????????????????????debug????
>
>
> # Define tables here such as sources, sinks, views, or temporal tables.
>
>
> tables: [] # empty list
>
>
> # Define scalar, aggregate, or table functions here.
>
>
> functions: [] # empty list
>
>
>
>
> # Execution properties allow for changing the behavior of a table program.
>
>
> execution:
>   # 'batch' or 'streaming' execution
>   type: batch
>   # allow 'event-time' or only 'processing-time' in sources
>   time-characteristic: event-time
>   # interval in ms for emitting periodic watermarks
>   periodic-watermarks-interval: 200
>   # 'changelog' or 'table' presentation of results
>   result-mode: table
>   # maximum number of maintained rows in 'table' presentation of results
>   max-table-result-rows: 1000000
>   # parallelism of the program
>   parallelism: 1
>   # maximum parallelism
>   max-parallelism: 128
>   # minimum idle state retention in ms
>   min-idle-state-retention: 0
>   # maximum idle state retention in ms
>   max-idle-state-retention: 0
>   # controls how table programs are restarted in case of a failures
>   restart-strategy:
>     # strategy type
>     # possible values are "fixed-delay", "failure-rate", "none", or
> "fallback" (default)
>     type: fallback
>
>
>
>
>
> #==============================================================================
> # Deployment properties
>
> #==============================================================================
>
>
> # Deployment properties allow for describing the cluster to which table
> # programs are submitted to.
>
>
> deployment:
>   # general cluster communication timeout in ms
>   response-timeout: 5000
>   # (optional) address from cluster to gateway
>   gateway-address: ""
>   # (optional) port from cluster to gateway
>   gateway-port: 0
>
>
>
>
>
> #==============================================================================
> # Catalog properties
>
> #==============================================================================
> #catalogs:
> #  - name: myhive
> #    catalog:
> #      type: hive
> #      connector:
> #        hive.metastore.uris: thrift://localhost:9083
> #      is-default: false
> #      default-database: default
>
>
>
> ------------------ ???????? ------------------
> ??????: "Zhenghua Gao"<doc...@gmail.com>;
> ????????: 2019??3??7??(??????) ????3:13
> ??????: "user-zh"<user-zh@flink.apache.org>;
>
> ????: Re: sql-client batch ????????????
>
>
>
> ?????? tableSink ????????????????null????????debug??????
> ???????????????? conf/sql-client-defaults.yaml ????
>
> On Thu, Mar 7, 2019 at 1:31 PM yuess_coder <642969...@qq.com> wrote:
>
> > ??????????
> >
> >
> >
> >
> > java.lang.NullPointerException
> >   at
> >
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:1300)
> >   at
> >
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:1203)
> >   at
> >
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:1235)
> >   at
> >
> org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$applyUpdate$10(LocalExecutor.java:605)
> >   at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:181)
> >   at
> >
> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:603)
> >   at
> >
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:508)
> >   at
> >
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:342)
> >   at
> >
> org.apache.flink.table.client.cli.CliClient.callInsertInto(CliClient.java:483)
> >   at
> >
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:286)
> >   at java.util.Optional.ifPresent(Optional.java:159)
> >   at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:174)
> >   at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
> >   at org.apache.flink.table.client.SqlClient.start(SqlClient.java:107)
> >   at org.apache.flink.table.client.SqlClient.main(SqlClient.java:202)
> >
> >
> >
> >
> >
> > val sinkFieldTypes = tableSink
> >       .getFieldTypes
> >       .map(_.toInternalType)
> >
> > ????????null????????????
> >
> >
> > ------------------ ???????? ------------------
> > ??????: "Kurt Young"<ykt...@gmail.com>;
> > ????????: 2019??3??7??(??????) ????12:06
> > ??????: "user-zh"<user-zh@flink.apache.org>;
> >
> > ????: Re: sql-client batch ????????????
> >
> >
> >
> > ??????????????????????????????
> >
> > Best,
> > Kurt
> >
> >
> > On Thu, Mar 7, 2019 at 9:23 AM yuess_coder <642969...@qq.com> wrote:
> >
> > > ????sql-client??????????
> > >
> > > create table csv_source1(
> > > id varchar,
> > > name varchar
> > > ) with (
> > > type ='csv',
> > > path =
> > '/Users/IdeaProjects/github/apache-flink/build-target/bin/test1.csv'
> > > );
> > >
> > >
> > > create table csv_sink(
> > > id varchar,
> > > name varchar
> > > ) with (
> > > type ='csv',
> > > path =
> > '/Users/IdeaProjects/github/apache-flink/build-target/bin/test4.csv'
> > > );
> > >
> > > insert into csv_sink  select t1.name,t1.id from csv_source1 t1
> > >
> > >
> > > ??????org.apache.flink.table.api.TableEnvironment??????1300????????????execution
> > > batch????????????execution 
> > > streaming????????????????????????batch????????????sql?
> > >
> > >
> > >

回复