??????sql-client batch ????????????

2019-03-08 文章 yuess_coder
 


 
------
??: "Zhenghua Gao"
: 2019??3??8??(??) 10:57
??: "user-zh";
: Re: sql-client batch 


??DebugBatchCsvTableFactory??
BatchCompatibleTableSinkFactory ?? BatchTableSinkFactory??

/**
 * A CSV table factory.
 */
public class CsvTableFactory implements
   StreamTableSourceFactory,
   BatchTableSourceFactory,
   StreamTableSinkFactory,
   BatchCompatibleTableSinkFactory {

 ExternalTableUtil.scala(line 111) ??Java SPI 
??Factory
BatchTableSinkFactory??

/**
  * Converts an [[CatalogTable]] instance to a [[TableSink]] instance
  *
  * @param name  name of the table
  * @param externalTable the [[CatalogTable]] instance to convert
  * @param isStreaming   is in streaming mode or not.
  * @return
  */
def toTableSink(
 name: String,
 externalTable: CatalogTable,
 isStreaming: Boolean): TableSink[_] = {

  val tableProperties: TableProperties = generateTableProperties(name,
externalTable, isStreaming)
  if (isStreaming) {
val tableFactory =
TableFactoryService.find(classOf[StreamTableSinkFactory[_]],
  getToolDescriptor(getStorageType(name, tableProperties), tableProperties))
tableFactory.createStreamTableSink(tableProperties.toKeyLowerCase.toMap)
  } else {
val tableFactory =
TableFactoryService.find(classOf[BatchTableSinkFactory[_]],
  getToolDescriptor(getStorageType(name, tableProperties), tableProperties))
tableFactory.createBatchTableSink(tableProperties.toKeyLowerCase.toMap)
  }
}

?? CsvTableFactory?? 
NoMatchingTableFactoryException

 BatchCompatibleTableSinkFactory
??connector??()fix??
 
TableFactoryUtil(line97-108) BatchTableSinkFactory ??
BatchCompatibleTableSinkFactory ??patch??

NPE??

On Thu, Mar 7, 2019 at 5:28 PM yuess_coder <642969...@qq.com> wrote:

> ??debug
>
>
> # Define tables here such as sources, sinks, views, or temporal tables.
>
>
> tables: [] # empty list
>
>
> # Define scalar, aggregate, or table functions here.
>
>
> functions: [] # empty list
>
>
>
>
> # Execution properties allow for changing the behavior of a table program.
>
>
> execution:
>   # 'batch' or 'streaming' execution
>   type: batch
>   # allow 'event-time' or only 'processing-time' in sources
>   time-characteristic: event-time
>   # interval in ms for emitting periodic watermarks
>   periodic-watermarks-interval: 200
>   # 'changelog' or 'table' presentation of results
>   result-mode: table
>   # maximum number of maintained rows in 'table' presentation of results
>   max-table-result-rows: 100
>   # parallelism of the program
>   parallelism: 1
>   # maximum parallelism
>   max-parallelism: 128
>   # minimum idle state retention in ms
>   min-idle-state-retention: 0
>   # maximum idle state retention in ms
>   max-idle-state-retention: 0
>   # controls how table programs are restarted in case of a failures
>   restart-strategy:
> # strategy type
> # possible values are "fixed-delay", "failure-rate", "none", or
> "fallback" (default)
> type: fallback
>
>
>
>
>
> #==
> # Deployment properties
>
> #==
>
>
> # Deployment properties allow for describing the cluster to which table
> # programs are submitted to.
>
>
> deployment:
>   # general cluster communication timeout in ms
>   response-timeout: 5000
>   # (optional) address from cluster to gateway
>   gateway-address: ""
>   # (optional) port from cluster to gateway
>   gateway-port: 0
>
>
>
>
>
> #==
> # Catalog properties
>
> #==
> #catalogs:
> #  - name: myhive
> #catalog:
> #  type: hive
> #  connector:
> #hive.metastore.uris: thrift://localhost:9083
> #  is-default: false
> #  default-database: default
>
>
>
> --  --
> ??: "Zhenghua Gao";
> : 2019??3??7??(??) 3:13
> ??: "user-zh";
>
> : Re: sql-client b

?????? sql-client batch ????????????

2019-03-07 文章 yuess_coder
??debug


# Define tables here such as sources, sinks, views, or temporal tables.


tables: [] # empty list


# Define scalar, aggregate, or table functions here.


functions: [] # empty list




# Execution properties allow for changing the behavior of a table program.


execution:
  # 'batch' or 'streaming' execution
  type: batch
  # allow 'event-time' or only 'processing-time' in sources
  time-characteristic: event-time
  # interval in ms for emitting periodic watermarks
  periodic-watermarks-interval: 200
  # 'changelog' or 'table' presentation of results
  result-mode: table
  # maximum number of maintained rows in 'table' presentation of results
  max-table-result-rows: 100
  # parallelism of the program
  parallelism: 1
  # maximum parallelism
  max-parallelism: 128
  # minimum idle state retention in ms
  min-idle-state-retention: 0
  # maximum idle state retention in ms
  max-idle-state-retention: 0
  # controls how table programs are restarted in case of a failures
  restart-strategy:
# strategy type
# possible values are "fixed-delay", "failure-rate", "none", or "fallback" 
(default)
type: fallback




#==
# Deployment properties
#==


# Deployment properties allow for describing the cluster to which table
# programs are submitted to.


deployment:
  # general cluster communication timeout in ms
  response-timeout: 5000
  # (optional) address from cluster to gateway
  gateway-address: ""
  # (optional) port from cluster to gateway
  gateway-port: 0




#==
# Catalog properties
#==
#catalogs:
#  - name: myhive
#catalog:
#  type: hive
#  connector:
#hive.metastore.uris: thrift://localhost:9083
#  is-default: false
#  default-database: default



--  --
??: "Zhenghua Gao";
: 2019??3??7??(??) 3:13
??: "user-zh";

: Re: sql-client batch 



?? tableSink nulldebug??
???? conf/sql-client-defaults.yaml 

On Thu, Mar 7, 2019 at 1:31 PM yuess_coder <642969...@qq.com> wrote:

> ??
>
>
>
>
> java.lang.NullPointerException
>   at
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:1300)
>   at
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:1203)
>   at
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:1235)
>   at
> org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$applyUpdate$10(LocalExecutor.java:605)
>   at
> org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:181)
>   at
> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:603)
>   at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:508)
>   at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:342)
>   at
> org.apache.flink.table.client.cli.CliClient.callInsertInto(CliClient.java:483)
>   at
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:286)
>   at java.util.Optional.ifPresent(Optional.java:159)
>   at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:174)
>   at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>   at org.apache.flink.table.client.SqlClient.start(SqlClient.java:107)
>   at org.apache.flink.table.client.SqlClient.main(SqlClient.java:202)
>
>
>
>
>
> val sinkFieldTypes = tableSink
>   .getFieldTypes
>   .map(_.toInternalType)
>
> null
>
>
> --  --
> ??: "Kurt Young";
> : 2019??3??7??(??) 12:06
> ??: "user-zh";
>
> : Re: sql-client batch 
>
>
>
> ??
>
> Best,
> Kurt
>
>
> On Thu, Mar 7, 2019 at 9:23 AM yuess_coder <642969...@qq.com> wrote:
>
> > sql-client??
> >
> > create table csv_source1(
> > id varchar,
> > name varchar
> > ) with (
> > type ='csv',
> > path =
> '/Users/IdeaProjects/github/apache-flink/build-target/bin/test1.csv'
> > );
> >
> >
> > create table csv_sink(
> > id varchar,
> > name varchar
> > ) with (
> > type ='csv',
> > path =
> '/Users/IdeaProjects/github/apache-flink/build-target/bin/test4.csv'
> > );
> >
> > insert into csv_sink  select t1.name,t1.id from csv_source1 t1
> >
> >
> > ??org.apache.flink.table.api.TableEnvironment??1300execution
> > batchexecution 
> > streamingbatchsql?
> >
> >
> >

sql-client batch 模式执行报错

2019-03-06 文章 yuess_coder
我在sql-client提交任务:


create table csv_source1(
id varchar,
name varchar
) with (
type ='csv',
path = '/Users/IdeaProjects/github/apache-flink/build-target/bin/test1.csv'
);




create table csv_sink(
id varchar,
name varchar
) with (
type ='csv',
path = '/Users/IdeaProjects/github/apache-flink/build-target/bin/test4.csv'
);


insert into csv_sink  select t1.name,t1.id from csv_source1 t1




错误是org.apache.flink.table.api.TableEnvironment这个类1300行空指针,用execution 
batch模式不行,用execution streaming模式是可以的。请问下才能batch模式执行这个sql?

sql-client ????????????flink??????

2019-03-03 文章 yuess_coder