flink cdc同步数据

2021-02-05 Thread
hello  我现在碰到一个问题  在使用flink
cdc同步数据时我设置了snapshot.mode的值为schema_only,但是当我重启任务时发现都是从最新开始消费,我该怎么做才能从上次停止任务的断点继续消费;同时我使用MySQLSource.builder().serverId(123456)的方式设置了server_id,但是从打印出来的数据来看并没有生效



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink cdc 同步数据问题

2021-02-05 Thread
hello  我想问一下使用flink
cdc同步数据是设置了snapshot.mode这个参数为schema_only,但是我发现每次重启任务都会从最新开始读取数据,我怎么做才可以从上次断点继续消费呢;同时我通过MySQLSource.builder().serverId(123456)的方式设置了server_id但是从我同步出来的数据来看server_id并不是我设置的值



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink-SQL1.11版本对map类型中value的空指针异常

2020-10-13 Thread
hello
我在使用flink-sql1.11版本是使用到了map类型,但是我遇到了问题,当map中的value为空时会产生空指针异常,下面附上我的错误以及源代码
org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_152]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_152]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_152]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_152]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.11-1.11.1.jar:1.11.1]
Caused by: java.io.IOException: Failed to deserialize Avro record.
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:151)
~[flink-avro-1.11.1.jar:1.11.1]
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75)
~[flink-avro-1.11.1.jar:1.11.1]
at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
~[lexus-flink_2.11-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRe

Re: flink-SQL1.11版本对map类型中value的空指针异常

2020-10-13 Thread
other_para MAP这是我定义的map类型



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-SQL1.11版本对map类型中value的空指针异常

2020-10-13 Thread
other_para MAP



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-SQL1.11版本对map类型中value的空指针异常

2020-10-13 Thread
好的,谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复: flink-SQL1.11版本对map类型中value的空指针异常

2020-10-13 Thread
好的,我尝试一下



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复: flink-SQL1.11版本对map类型中value的空指针异常

2020-10-13 Thread
我尝试使用MAP来定义我的类型,问题已经解决,谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复: 回复: flink-SQL1.11版本对map类型中value的空指针异常

2020-10-13 Thread
我之前对源码进行了修复,测试的时候没有恢复之前的源码状态,后来发现Map这种方式是不可以的



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flinksql引入flink-parquet_2.11任务提交失败

2020-10-14 Thread
hello,
我现在使用flinksql的方式读取kafka数据并且以parquet的格式写出到hdfs,当我引入flink-parquet的依赖时程序提交失败了,但是我以内置的avro、json等格式输出是正常的,下面是我的报错信息
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Unable to create a source for reading table
'default_catalog.default_database.cloud_behavior_source'.

Table options are:

'connector'='kafka'
'format'='avro'
'properties.bootstrap.servers'='10.2.5.100:9092,10.2.5.101:9092,10.2.5.102:9092'
'properties.group.id'='testGroup'
'scan.startup.mode'='earliest-offset'
'topic'='cloud_behavior'
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create
a source for reading table
'default_catalog.default_database.cloud_behavior_source'.

Table options are:

'connector'='kafka'
'format'='avro'
'properties.bootstrap.servers'='10.2.5.100:9092,10.2.5.101:9092,10.2.5.102:9092'
'properties.group.id'='testGroup'
'scan.startup.mode'='earliest-offset'
'topic'='cloud_behavior'
at
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:774)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:746)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:236)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callDML(SqlSubmit.java:106)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callCommand(SqlSubmit.java:81)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.run(SqlSubmit.java:61)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.main(SqlSubmit.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.p

flink 自定义udf注册后不能使用

2020-10-15 Thread
hello 
我在使用flinkSQL注册udf时,发生了以下错误,这是我定义有问题还是flink的bug
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: SQL validation failed. From line 11, column 6 to line 11,
column 23: No match found for function signature imei_encrypt()
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.table.api.ValidationException: SQL validation
failed. From line 11, column 6 to line 11, column 23: No match found for
function signature imei_encrypt()
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callDML(SqlSubmit.java:106)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callCommand(SqlSubmit.java:81)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.run(SqlSubmit.java:61)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.main(SqlSubmit.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 11,
column 6 to line 11, column 23: No match found for function signature
imei_encrypt()
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
at 
org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
at
org.apache.calcite.sql

Re: 回复: flink 自定义udf注册后不能使用

2020-10-15 Thread
这是我的udf声明
CREATE FUNCTION imei_encrypt AS 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE
JAVA;
以下是udf实现
public class IMEIEncrypt extends ScalarFunction {

public String eval(String column_type,String value) {
EncryptUtils encryptUtils = new EncryptUtils();
return encryptUtils.encrypt(column_type,value);
}
}




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复: flink 自定义udf注册后不能使用

2020-10-15 Thread
完整的sql执行文件

SET stream.enableCheckpointing=1000*60;
SET stream.setParallelism=4;

CREATE FUNCTION imei_encrypt AS 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE
JAVA;

-- Kafka cdbp zdao source 表
create TABLE cloud_behavior_source(
operation VARCHAR,
operation_channel VARCHAR,
`time` VARCHAR,
ip VARCHAR,
lat VARCHAR,
lng VARCHAR,
user_id VARCHAR,
device_id VARCHAR,
imei VARCHAR,
targets ARRAY>,
product_name VARCHAR,
product_version VARCHAR,
product_vendor VARCHAR,
platform VARCHAR,
platform_version VARCHAR,
`languaage` VARCHAR,
locale VARCHAR,
other_para MAP
) with (
'connector'='kafka',
'topic'='cloud_behavior',
'properties.bootstrap.servers'='',
'properties.group.id'='testGroup',
'format'='avro',
'scan.startup.mode'='earliest-offset'
);

-- Hbase zdao uv 统计 Sink 表
create TABLE cloud_behavior_sink(
operation VARCHAR,
operation_channel VARCHAR,
`time` VARCHAR,
ip VARCHAR,
lat VARCHAR,
lng VARCHAR,
user_id VARCHAR,
device_id VARCHAR,
imei VARCHAR,
product_name VARCHAR,
product_version VARCHAR,
product_vendor VARCHAR,
platform VARCHAR,
platform_version VARCHAR,
`languaage` VARCHAR,
locale VARCHAR
)with (
'connector'='filesystem',
'path'='hdfs:///data_test/hongliang_song/working_sql_test_parquet.db',
'format'='parquet',
'sink.rolling-policy.file-size'='128MB',
'sink.rolling-policy.rollover-interval'='10min'
);

-- 业务过程
insert into cloud_behavior_sink
select
 operation,
 operation_channel,
 `time`,
 ip,
 lat,
 lng,
 user_id,
 device_id,
 imei_encrypt(imei) AS imei,
 product_name,
 product_version,
 product_vendor,
 platform,
 platform_version,
 `languaage`,
 locale
FROM cloud_behavior_source;



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复:回复: flink 自定义udf注册后不能使用

2020-10-15 Thread
是的,我这个函数只需要一个参数



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复:回复: flink 自定义udf注册后不能使用

2020-10-15 Thread
是的,是我传参有问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flinkSQL1.11写出数据到jdbc fleld type do not match

2020-10-19 Thread
hello
我在使用flinksql1.11写出数据到jdbc是遇到了field type类型不匹配的问题,是我类型设置有问题吗?
下面是我的异常日志以及sql文件

SET stream.enableCheckpointing=1000*60;
SET stream.setParallelism=3;

-- Kafka cdbp zdao source 表
create TABLE cloud_behavior_source(
operation STRING,
operation_channel STRING,
`time` STRING,
ip STRING,
lat STRING,
lng STRING,
user_id STRING,
device_id STRING,
imei STRING,
targets ARRAY>,
product_name STRING,
product_version STRING,
product_vendor STRING,
platform STRING,
platform_version STRING,
`languaage` STRING,
locale STRING,
other_para MAP
) with (
'connector'='kafka',
'topic'='cloud_behavior',
'properties.bootstrap.servers'='',
'properties.group.id'='testGroup',
'format'='avro',
'scan.startup.mode'='earliest-offset'
);

-- Hbase zdao uv 统计 Sink 表
create TABLE cloud_behavior_sink(
operation STRING,
operation_channel STRING,
ip STRING,
lat STRING,
lng STRING,
user_id STRING,
device_id STRING
) with (
'connector'='jdbc',
'url'='jdbc:mysql://hosts:3306/d_bigdata',
'table-name'='flink_sql_test',
'username'='',
'password'='',
'sink.buffer-flush.max-rows'='100'
);

-- 业务过程
insert into cloud_behavior_sink
select
 *
from cloud_behavior_source;

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/data1/flink/flink-1.11.1-log/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type
[org.apache.logging.slf4j.Log4jLoggerFactory]


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Field types of query result and registered TableSink
default_catalog.default_database.cloud_behavior_sink do not match.
Query schema: [operation: VARCHAR(2147483647), operation_channel:
VARCHAR(2147483647), time: VARCHAR(2147483647), ip: VARCHAR(2147483647),
lat: VARCHAR(2147483647), lng: VARCHAR(2147483647), user_id:
VARCHAR(2147483647), device_id: VARCHAR(2147483647), imei:
VARCHAR(2147483647), targets: ARRAY>, product_name: VARCHAR(2147483647),
product_version: VARCHAR(2147483647), product_vendor: VARCHAR(2147483647),
platform: VARCHAR(2147483647), platform_version: VARCHAR(2147483647),
languaage: VARCHAR(2147483647), locale: VARCHAR(2147483647), other_para:
MAP]
Sink schema: [operation: VARCHAR(2147483647), operation_channel:
VARCHAR(2147483647), ip: VARCHAR(2147483647), lat: VARCHAR(2147483647), lng:
VARCHAR(2147483647), user_id: VARCHAR(2147483647), device_id:
VARCHAR(2147483647)]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.table.api.ValidationException: Field types of
query result and registered TableSink
default_catalog.default_database.cloud_behavior_sink do not match.
Query schema: [operation: VARCHAR(2147483647), operation_channel:
VARCHAR(2147483647), time: VARCHAR(2147483647), ip: VARCHAR(2147483647),
lat: VARCHAR(2147483647), lng: VARCHAR(2147483647), user_id:
VARCHAR(2147483647), device_id: VARCHAR(2147483647), imei:
VARCHAR(2147483647), targets: ARRAY>, product_name: VARCHAR(2147483647),
product_version: VARCHAR(2147483647), product_vendor: VARCHAR(2147483647),
platform: VARCHAR(2147483647), platform_version: VARCHAR(2147483647),
languaage: VARCHAR(2147483647), locale: VARCHAR(2147483647), other_para:
MAP]
Sink schema: [operation: VARCHAR(2147483647), operation_channel:
VARCHAR(2147483647), ip: VARCHAR(2147483647), lat: VARCHAR(2147483647), lng:
VARCHAR(2147483647), user_id: VARCHAR(2147483647), device_id:
VARCHAR(2147483647)]
at
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplici

flinksql报错Could not find any factory for identifier 'elasticsearch-6' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

2020-10-19 Thread
hello 
我在使用flinksql连接器时当我将flink-sql-connector-elasticsearch6_2.11-1.11.1.jar放在lib下,程序正常执行,但是当我在pom中进行配置时会产生如下报错,同样的问题会产生在hbase、jdbc的connector中,请问下这可能是什么造成的
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Unable to create a sink for writing table
'default_catalog.default_database.cloud_behavior_sink'.

Table options are:

'connector'='elasticsearch-6'
'document-type'='cdbp'
'hosts'='http://10.2.11.116:9200;http://10.2.11.117:9200;http://10.2.11.118:9200;http://10.2.11.119:9200'
'index'='flink_sql_test'
'sink.bulk-flush.max-actions'='100'
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create
a sink for writing table
'default_catalog.default_database.cloud_behavior_sink'.

Table options are:

'connector'='elasticsearch-6'
'document-type'='cdbp'
'hosts'='http://10.2.11.116:9200;http://10.2.11.117:9200;http://10.2.11.118:9200;http://10.2.11.119:9200'
'index'='flink_sql_test'
'sink.bulk-flush.max-actions'='100'
at
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
at
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callDML(SqlSubmit.java:97)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callCommand(SqlSubmit.java:72)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.run(SqlSubmit.java:53)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.main(SqlSubmit.java:24)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a
connector using option ''connector'='elasticsearch-6''.
at
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
at
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
... 37 more
Caused by: org.apache.fli

Re: flinksql报错Could not find any factory for identifier 'elasticsearch-6' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

2020-10-19 Thread
现在我的lib下没有ElasticSearch相关的connector,在pom中引用,这样会产生冲突吗,还有这种现象有可能是在哪块冲突了



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flinksql报错Could not find any factory for identifier 'elasticsearch-6' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

2020-10-20 Thread
目前我只能把需要的jar放在lib目录下,我能确定冲突的类已经打在jar中,也能确定这个类是唯一的,但是目前没有发现冲突的点



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flinksql指定kafka多topic

2020-10-25 Thread
hello,
我想问一下目前flinksql支持同时指定kafka的多topic吗,例如
'topic'='cloud_behavior,cloud_behavior_other,cloud_behavior_qxb,cloud_behavior_cc,cloud_behavior_cs'



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复:flinksql指定kafka多topic

2020-10-25 Thread
有没有一种更加友好的方式  使用topic-pattern的话在提供给非开发人员使用成本太高



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复:回复:flinksql指定kafka多topic

2020-10-25 Thread
好的 那我尝试下通过KafkaDynamicTableFactory来实现



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复:回复:回复:flinksql指定kafka多topic

2020-10-25 Thread
with (
'connector'='kafka',
  
'topic'='cloud_behavior;cloud_behavior_other;cloud_behavior_qxb;cloud_behavior_cc;cloud_behavior_cs',
'properties.bootstrap.servers'='',
'properties.group.id'='flink_2_hive_and_imei_ncrypy_test',
'format'='avro',
'scan.startup.mode'='group-offsets'
);

这是我的配置,但是在执行的时候失败了,我想知道目前支持的flinksql版本是哪个



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flinkSQL针对join操作设置不同窗口

2020-10-28 Thread
hello
我们这有一种业务场景是关于两个动态表的join,其中一张表是完全的动态表,去关联另一张动态表中当天的数据,请问这种情况的下join场景支持吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink1.11 读取kafka avro格式数据发序列化失败

2020-11-10 Thread
hello  我在使用flink1.11版本读取kafka
avro格式数据时遇到了错误,由于我们的avro特殊,因此源码稍微作了修改,以下是改动的代码片段
@Override
public T deserialize(byte[] message) throws IOException {
// read record
checkAvroInitialized();
inputStream.setBuffer(message);
inputStream.skip(5);
Schema readerSchema = getReaderSchema();
GenericDatumReader datumReader = getDatumReader();
datumReader.setSchema(readerSchema);
return datumReader.read(null, decoder);
}
源码包为:org.apache.flink.formats.avro.AvroDeserializationSchema

相同的改动在1.9.0是可以正常工作,我想知道在读取avro格式的数据这块社区是有过什么改动吗

以下是错误信息
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: org.apache.flink.client.program.ProgramInvocationException:
Job failed (JobID: f4d2bd903a55e2d10d67d69eadba618a)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: f4d2bd903a55e2d10d67d69eadba618a)
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at
org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116)
at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1681)
at
com.intsig.flink.streaming.streaming_project.abtest.GenericRecordTest.main(GenericRecordTest.java:54)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
failed (JobID: f4d2bd903a55e2d10d67d69eadba618a)
at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(Compl

Re: 回复:flink1.11 读取kafka avro格式数据发序列化失败

2020-11-10 Thread
这是我尝试输出的message长度
message length is: 529
message length is: 212
message length is: 391




--
Sent from: http://apache-flink.147419.n8.nabble.com/


hive模块依赖orc版本与flink-orc版本不一致问题

2021-01-05 Thread
hello
 
目前我碰到一个问题,当我同时使用flink-orc_2.11-1.11.1.jar与flink-sql-connector-hive-1.2.2_2.11-1.11.0.jar时发现针对orc这种数据格式所依赖的版本不同,我们hive版本是cdh
1.1.0,官网看到依赖的orc版本是1.4.3并且无需orc-shims这个依赖,但是flink-orc这个模块需要同时依赖orc-core
1.5.6与orc-shims 1.5.6,这两个模块如何同时使用



--
Sent from: http://apache-flink.147419.n8.nabble.com/