Re: debezium/flink-oracle-cdc-connector large memory consumption for transactions

2023-03-03 Thread Антон
Hello,Try Infinispan - https://debezium.io/documentation/reference/stable/connectors/oracle.html#oracle-event-buffering15:12, 3 марта 2023 г., Alexandr Dmitrov :Hello! Maybe you could help with debezium/vervica connectors in optimizing memory consumption for large changes in a single transaction using vervica flink-oracle-cdc-connector.Environment:flink - 1.15.1oracle-cdc - 2.3oracle - 19.3When updating large number of rows in a single transaction I get the exception:"ERROR io.debezium.connector.oracle.logminer.LogMinerHelper         [] - Mining session stopped due to the java.lang.OutOfMemoryError: Java heap space".For the table with 20 fields of types (int, float, timestamp, date, string)*4 I could get the results listed in table:For TaskManager heap space=540Mi, I could get 250k rows through, but failed on 300k.For TaskManager heap space=960Mi, I could get 350k rows through, but failed on 400k.Using SourceFunction created from OracleSource.builder(), startup-mode set to latest-offset,and self-defined deserializer, where I added log.info to see if the problem starts after debezium/cdc-connector did it work, but Java Heap Space error occurs before the flow gets to the deserialization.I've tried to provide the next debezium properties, but got no luck:dbzProps.setProperty("log.mining.batch.size.max", "1");dbzProps.setProperty("log.mining.batch.size.default", "2000");dbzProps.setProperty("log.mining.batch.size.min", "100");dbzProps.setProperty("log.mining.view.fetch.size", "1000");dbzProps.setProperty("max.batch.size", "64");dbzProps.setProperty("max.queue.size", "256");After some digging, I could find places in code where debezium consume changes from oracle:Loop to fetch records:https://github.com/debezium/debezium/blob/1.6/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java#L154Inserts/Updates/Deletes records are registered in transaction buffer:https://github.com/debezium/debezium/blob/1.6/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java#L267Commit records cause all events in transaction buffer to be commited - sent forward to dispatcher, ending in batch handler:https://github.com/debezium/debezium/blob/1.6/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java#L144As far as I see, the cause of the problem is that debezium stores all changes locally before the commit of the transaction occurs.Is there a way to make the connector split a large amount of changes processing for a single transaction? Or maybe any other way to get rid of the large memory consumption problem? 
Sent from Yandex.Mail for mobile: http://m.ya.ru/ymail

debezium/flink-oracle-cdc-connector large memory consumption for transactions

2023-03-03 Thread Alexandr Dmitrov
Hello! Maybe you could help with debezium/vervica connectors in optimizing memory consumption for large changes in a single transaction using vervica flink-oracle-cdc-connector.Environment:flink - 1.15.1oracle-cdc - 2.3oracle - 19.3When updating large number of rows in a single transaction I get the exception:"ERROR io.debezium.connector.oracle.logminer.LogMinerHelper         [] - Mining session stopped due to the java.lang.OutOfMemoryError: Java heap space".For the table with 20 fields of types (int, float, timestamp, date, string)*4 I could get the results listed in table:For TaskManager heap space=540Mi, I could get 250k rows through, but failed on 300k.For TaskManager heap space=960Mi, I could get 350k rows through, but failed on 400k.Using SourceFunction created from OracleSource.builder(), startup-mode set to latest-offset,and self-defined deserializer, where I added log.info to see if the problem starts after debezium/cdc-connector did it work, but Java Heap Space error occurs before the flow gets to the deserialization.I've tried to provide the next debezium properties, but got no luck:dbzProps.setProperty("log.mining.batch.size.max", "1");dbzProps.setProperty("log.mining.batch.size.default", "2000");dbzProps.setProperty("log.mining.batch.size.min", "100");dbzProps.setProperty("log.mining.view.fetch.size", "1000");dbzProps.setProperty("max.batch.size", "64");dbzProps.setProperty("max.queue.size", "256");After some digging, I could find places in code where debezium consume changes from oracle:Loop to fetch records:https://github.com/debezium/debezium/blob/1.6/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java#L154Inserts/Updates/Deletes records are registered in transaction buffer:https://github.com/debezium/debezium/blob/1.6/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java#L267Commit records cause all events in transaction buffer to be commited - sent forward to dispatcher, ending in batch handler:https://github.com/debezium/debezium/blob/1.6/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java#L144As far as I see, the cause of the problem is that debezium stores all changes locally before the commit of the transaction occurs.Is there a way to make the connector split a large amount of changes processing for a single transaction? Or maybe any other way to get rid of the large memory consumption problem? 


FLINK postgresql CDC 报语法错误

2022-12-01 Thread bmw
HI flink postgresql CDC  flink1.12  ,postgresql:9.6.21 报错:
CREATE TABLE postgres_cdc_test (
  id INT,
  name STRING,
  PRIMARY KEY (id) NOT ENFORCED 
) WITH (
  'connector' = 'postgres-cdc',   
  'hostname' = '192.168.1.101',
  'port' = '5432',  
  'username' = 'postgres',
  'password' = 'test', 
  'database-name' = 'test',
  'schema-name' = 'public',  
  'table-name' = 'test',   
  'debezium.slot.name' = 'customslotname',  
  'decoding.plugin.name' = 'pgoutput'
);


错误信息:



flink sql cdc 2.2.1消费mysql binlog异常

2022-10-09 Thread casel.chen
flink sql cdc 2.2.1消费mysql binlog遇到如下异常,有谁遇到过?发现作业自己做了重试后过去了,想知道异常的root 
cause是什么?手动重起了作业重新消费后还是会出现。




Caused by: 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
 An exception occurred in the change event producer. This connector will be 
stopped.

at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)

at 
com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:72)

at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1185)

at 
com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973)

at 
com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)

at 
com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)

... 1 more

Caused by: io.debezium.DebeziumException: Failed to deserialize data of 
EventHeaderV4{timestamp=1665303438000, eventType=TABLE_MAP, 
serverId=1940348705, headerLength=19, dataLength=91, nextPosition=457067313, 
flags=0}

at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1146)

... 5 more

Caused by: 
com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException:
 Failed to deserialize data of EventHeaderV4{timestamp=1665303438000, 
eventType=TABLE_MAP, serverId=1940348705, headerLength=19, dataLength=91, 
nextPosition=457067313, flags=0}

at 
com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:309)

at 
com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeTableMapEventData(EventDeserializer.java:281)

at 
com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:228)

at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:233)

at 
com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:945)

... 3 more

Caused by: java.io.EOFException

at 
com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:209)

at 
com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readInteger(ByteArrayInputStream.java:51)

at 
com.github.shyiko.mysql.binlog.event.deserialization.TableMapEventDataDeserializer.readMetadata(TableMapEventDataDeserializer.java:91)

at 
com.github.shyiko.mysql.binlog.event.deserialization.TableMapEventDataDeserializer.deserialize(TableMapEventDataDeserializer.java:42)

at 
com.github.shyiko.mysql.binlog.event.deserialization.TableMapEventDataDeserializer.deserialize(TableMapEventDataDeserializer.java:27)

at 
com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:303)

... 7 more

Flink MySQL CDC 注册 schema registry 问题

2022-04-22 Thread casel.chen
Hi, 我想使用 Flink MySQL CDC Connector 以 DataStream 方式消费 MySQL Binlog 
输出变更数据到下游kafka topic (1),同时监听database schema change事件,将最新的schema数据输出到下游另一个kafka 
topic (2),又或者直接注册schema到 confluent / apicurio schema registry,查了一下flink 
cdc官方文档[1],并没有这方面的信息。请问应该怎么实现呢?有相关文档或例子么?谢谢!


[1] 
https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html

flink mysql cdc注册confluent schema registry

2022-01-29 Thread casel.chen
我想利用flink mysql cdc输出变更数据到kafka,同时将table schema注册到confluent schema 
registry,以模拟debezium kafka connect效果[1]。还请指教要如何下手呢?谢谢!


[1] https://blog.csdn.net/OldDirverHelpMe/article/details/107881170

Flink mysql CDC 进程正常,但发现数据丢失了

2022-01-17 Thread Fei Han

@all:
大家好,Flink Mysql CDC实时同步数据,发现mysql和下游StarRocks的数据量对不上。
StarRocks用的是primary key模型,
版本:
Flink1.13.3 
Flink CDC 2.1.1
报错如下:

Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
unexpected exception while polling the records
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:148)
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:103)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
 An exception occurred in the change event producer. This connector will be 
stopped.
at 
io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onEventDeserializationFailure(MySqlStreamingChangeEventSource.java:1193)
at 
com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:958)
at 
com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)
at 
com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)
... 1 more
Caused by: io.debezium.DebeziumException
at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1146)
... 5 more
Caused by: java.lang.ArrayIndexOutOfBoundsException
at 
com.github.shyiko.mysql.binlog.io.BufferedSocketInputStream.read(BufferedSocketInputStream.java:65)
at 
com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readWithinBlockBoundaries(ByteArrayInputStream.java:262)
at 
com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:241)
at java.io.InputStream.skip(InputStream.java:224)
at 
com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.skipToTheEndOfTheBlock(ByteArrayInputStream.java:280)
at 
com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:305)
at 
com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:232)
at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:233)
at 
com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:945)
... 3 more




Flink mysql cdc凌晨同步报错

2022-01-12 Thread Fei Han

@all:
Flink mysql cdc凌晨同步报错,流任务都失败了。报错如下:

org.apache.flink.runtime.JobException: Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, 
backoffTimeMS=3)

at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)

at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)

at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)

at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)

at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)

at sun.reflect.GeneratedMethodAccessor137.invoke(Unknown Source)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)

at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)

at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)

at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)

at akka.actor.Actor.aroundReceive(Actor.scala:517)

at akka.actor.Actor.aroundReceive$(Actor.scala:515)

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

at akka.actor.ActorCell.invoke(ActorCell.scala:561)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

at akka.dispatch.Mailbox.run(Mailbox.scala:225)

at akka.dispatch.Mailbox.exec(Mailbox.scala:235)

at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

加载更多

Re: flink mysql cdc同步字段不识别

2022-01-06 Thread Jark Wu
这个报错日志应该没有关系,是 rest client 的报错,不是正常数据处理流程的报错。

mysql-cdc 没有 jackson json 解析相关的代码。

On Wed, 5 Jan 2022 at 17:09, Fei Han 
wrote:

>
> @all:
> Flink mysql cdc同步数据报字段不识别,是什么原因造成的?难道是关键字不识别?报错日志如下:
>
>  httpResponseStatus=200 OK}
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
> Unrecognized field "status" (class
> org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as
> ignorable (one known property: "errors"])
>  at [Source: UNKNOWN; line: -1, column: -1] (through reference chain:
> org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"])
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:987)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1974)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1686)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1635)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:541)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1390)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:362)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:195)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:4569)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2798)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:3261)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:483)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:466)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
> [?:1.8.0_211]
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> [?:1.8.0_211]
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> [?:1.8.0_211]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_211]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_211]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_211]
>


Re: flink mysql cdc 同步数据报错

2022-01-05 Thread Caizhi Weng
Hi!

根本原因是 Caused by: java.io.StreamCorruptedException: unexpected block
data,也就是说集群上这个 class 的版本和客户端这个 class 的版本不一致。建议检查集群和客户端的 flink 版本以及 cdc
connector 版本是否一致。

Fei Han  于2022年1月5日周三 19:12写道:

> @all:
>   大家好,在Flink mysql cdc中同步数据时,对接mysql无法同步
> 版本:
> flink1.13.3
> flink mysql cdc 2.1.1
>
> 如下报错是什么原因造成的,请大佬们看下
>
>  ERROR org.apache.flink.runtime.rest.RestClient 491 parseResponse -
> Received response was neither of the expected type ([simple type, class
> org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody])
> nor an error.
> Response=JsonResponse{json={"status":{"id":"COMPLETED"},"job-execution-result":{"id":"61f45c66580d385727fd97520b66a79a","application-status":"FAILED","accumulator-results":{},"net-runtime":55,"failure-cause":{"class":"org.apache.flink.runtime.client.JobInitializationException","stack-trace":"org.apache.flink.runtime.client.JobInitializationException:
> Could not start the JobMaster.\n\tat
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)\n\tat
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)\n\tat
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)\n\tat
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)\n\tat
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)\n\tat
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat
> java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n\tat
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n\tat
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
> java.lang.Thread.run(Thread.java:748)\nCaused by:
> java.util.concurrent.CompletionException: java.lang.RuntimeException:
> org.apache.flink.runtime.JobException: Cannot instantiate the coordinator
> for operator Source: TableSourceScan(table=[[qhc_ods_catalog, qhc_hms,
> order_consume_buy_order_src]], fields=[id, consume_order_id,
> consume_order_code, sku_id, sku_name, is_card_goods, buy_order_id,
> buy_order_code, buy_order_type, consume_order_time, gmt_create,
> gmt_modified, database_name, op_ts, table_name]) -> Calc(select=[id,
> consume_order_id, consume_order_code, sku_id, sku_name, is_card_goods,
> buy_order_id, buy_order_code, buy_order_type, consume_order_time,
> gmt_create, gmt_modified]) -> NotNullEnforcer(fields=[id,
> consume_order_id])\n\tat
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)\n\tat
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)\n\tat
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)\n\t...
> 7 more\nCaused by: java.lang.RuntimeException:
> org.apache.flink.runtime.JobException: Cannot instantiate the coordinator
> for operator Source: TableSourceScan(table=[[qhc_ods_catalog, qhc_hms,
> order_consume_buy_order_src]], fields=[id, consume_order_id,
> consume_order_code, sku_id, sku_name, is_card_goods, buy_order_id,
> buy_order_code, buy_order_type, consume_order_time, gmt_create,
> gmt_modified, database_name, op_ts, table_name]) -> Calc(select=[id,
> consume_order_id, consume_order_code, sku_id, sku_name, is_card_goods,
> buy_order_id, buy_order_code, buy_order_type, consume_order_time,
> gmt_create, gmt_modified]) -> NotNullEnforcer(fields=[id,
> consume_order_id])\n\tat
> org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)\n\tat
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)\n\tat
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)\n\t...
> 7 more\nCaused by: org.apache.flink.runtime.JobException: Cannot
> instantiate the coordinator for operator Source:
> TableSourceScan(table=[[qhc_ods_catalog, qhc_hms,
> order_consume_buy_order_src]], fields=[id, consume_order_id,
> consume_order_code, sku_id, sku_name, is_card_goods, buy_order_id,
> buy_order_code, buy_order_type, consume_order_time, gmt_create,
> gmt_modified, database_name, op_ts, table_name]) -> Calc(select=[id,
> consume_order_id, consume_order_code, sku_id, sku_name, is_card_goods,
> buy_order_id, buy_order_code, buy_order_type, consume_order_time,
> gmt_create, gmt_modified]) -

flink mysql cdc同步字段不识别

2022-01-05 Thread Fei Han

@all:
Flink mysql cdc同步数据报字段不识别,是什么原因造成的?难道是关键字不识别?报错日志如下:

 httpResponseStatus=200 OK} 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
 Unrecognized field "status" (class 
org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as 
ignorable (one known property: "errors"])
 at [Source: UNKNOWN; line: -1, column: -1] (through reference chain: 
org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"])
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:987)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1974)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1686)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1635)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:541)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1390)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:362)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:195)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:4569)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2798)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:3261)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:483) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:466)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) 
[?:1.8.0_211]
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
 [?:1.8.0_211]
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
 [?:1.8.0_211]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_211]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_211]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_211]


flink sql cdc数据按主键keyby入库问题

2021-07-10 Thread casel.chen
场景:mysql数据实时同步到mongodb.  上游mysql binlog日志发到一个kafka topic, 
不保证同一个主键的记录发到相同的partition,为了保证下游sink 
mongodb同一主键的所有记录按序保存,所以需要按主键keyby。然后下游再批量写入mongodb。
问题:flink sql有办法解决上述问题?如果可以的话,要怎么写?


create table person_source (
  id BIGINT PRIMARY KEY NOT FORCED,
  name STRING,
  age BIGINT
) with (
   'connector' = 'kafka',
   ..
   'format' = 'canal-json'
);


create view person_view as 
select id, ??? from person_source group by id;


create table person_sink (
   id BIGINT PRIMARY KEY NOT FORCED,
  name STRING,
  age BIGINT
) with (
   'connector' = 'mongodb',
   ..
   'format' = 'json'
);


insert into person_sink select * from person_view;

Re: flink sql cdc如何获取元数据

2021-06-22 Thread Leonard Xu
Hello,

Flink sql cdc 还不支持获取元数据, 获取元数据的业务场景通常是怎么样的呢?

祝好,
Leonard




> 在 2021年6月23日,08:21,casel.chen  写道:
> 
> flink sql cdc如何获取元数据?像数据库名,表名,操作类型,binlog产生时间等。
> 
> 
> create table xxx_tbl (
>   k_op varchar, -- 操作类型
>   k_database varchar, -- 数据库名
>   k_table varchar, -- 表名
>   k_ts. BIGINT, -- binlog产生时间
>   idBIGINT,
>   name. varchar
> ) with (
>   'connector' = 'mysql-cdc',
>   .
>   'meta.fields-prefix' = 'k_'
> )



flink sql cdc如何获取元数据

2021-06-22 Thread casel.chen
flink sql cdc如何获取元数据?像数据库名,表名,操作类型,binlog产生时间等。


create table xxx_tbl (
   k_op varchar, -- 操作类型
   k_database varchar, -- 数据库名
   k_table varchar, -- 表名
   k_ts. BIGINT, -- binlog产生时间
   idBIGINT,
   name. varchar
) with (
   'connector' = 'mysql-cdc',
   .
   'meta.fields-prefix' = 'k_'
)

Re: Re: flink sql cdc做计算后, elastic search connector 多并发sink 会有问题,偶现!数据量较大,目前还没较好复现。请问有人是否也遇到过类似的问题。

2021-06-17 Thread Jark Wu
社区最近重新设计了 mysql-cdc 的实现,可以支持全量阶段并发读取、checkpoint,移除全局锁依赖。
可以关注 GitHub 仓库的动态 https://github.com/ververica/flink-cdc-connectors。
7月的 meetup 上也会分享相关设计和实现,敬请期待。

Best,
Jark

On Thu, 17 Jun 2021 at 09:34, casel.chen  wrote:

> Flink CDC什么时候能够支持修改并行度,进行细粒度的资源控制?目前我也遇到flink sql
> cdc写mysql遇到数据同步跟不上数据写入速度问题,何时能支持像mysql并行复制这种机制呢?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-06-16 17:27:14,"Leonard Xu"  写道:
> >看起来和 Flink-CDC 关系不大,看异常栈是 ES 侧抛出的异常 version_conflict_engine_exception,
> 可以查下这个异常,看下是不是有写(其他作业/业务 也在写同步表)冲突。
> >
> >祝好,
> >Leonard
> >
> >> 在 2021年6月16日,17:05,mokaful <649713...@qq.com> 写道:
> >>
> >> 相同问题,请问有处理方式吗
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re:Re: flink sql cdc做计算后, elastic search connector 多并发sink 会有问题,偶现!数据量较大,目前还没较好复现。请问有人是否也遇到过类似的问题。

2021-06-16 Thread casel.chen
Flink CDC什么时候能够支持修改并行度,进行细粒度的资源控制?目前我也遇到flink sql 
cdc写mysql遇到数据同步跟不上数据写入速度问题,何时能支持像mysql并行复制这种机制呢?

















在 2021-06-16 17:27:14,"Leonard Xu"  写道:
>看起来和 Flink-CDC 关系不大,看异常栈是 ES 侧抛出的异常 version_conflict_engine_exception, 
>可以查下这个异常,看下是不是有写(其他作业/业务 也在写同步表)冲突。
>
>祝好,
>Leonard
>
>> 在 2021年6月16日,17:05,mokaful <649713...@qq.com> 写道:
>> 
>> 相同问题,请问有处理方式吗
>> 
>> 
>> 
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql cdc做计算后, elastic search connector 多并发sink 会有问题,偶现!数据量较大,目前还没较好复现。请问有人是否也遇到过类似的问题。

2021-06-16 Thread Leonard Xu
看起来和 Flink-CDC 关系不大,看异常栈是 ES 侧抛出的异常 version_conflict_engine_exception, 
可以查下这个异常,看下是不是有写(其他作业/业务 也在写同步表)冲突。

祝好,
Leonard

> 在 2021年6月16日,17:05,mokaful <649713...@qq.com> 写道:
> 
> 相同问题,请问有处理方式吗
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: flink sql cdc做计算后, elastic search connector 多并发sink 会有问题,偶现!数据量较大,目前还没较好复现。请问有人是否也遇到过类似的问题。

2021-06-16 Thread mokaful
相同问题,请问有处理方式吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink sql cdc到kafka,如何提高sink并行度?

2021-06-14 Thread casel.chen
flink sql cdc发到kafka,显示下游写kafka并行度只有1,有没有办法提高并行度呢?

显示job-parallelism, table.exec.resource.default-parallelism, parallelism.default 
都是 24,但execution graph显示parallelism还是1,我设置了pipeline.operator-chaining=false

Re:Re: flink sql cdc数据同步至mysql

2021-06-12 Thread casel.chen
请问 flink sql cdc 场景下如何增大下游sink端并行度?
我试了修改default.parallism=2参数,并且将operator chain参数设置成false,并没有效果。
而后,我将作业分成两步:首先 源mysql cdc sink到 upsert kafka,再从 upsert kafka sink到 
目标mysql。是想通过kafka partition增大sink并行度
初步测试效果是可以的,kafka建了3个partitions,每个partitions都按主键hash分配到数据,下游并行度跟partitions个数对齐。


以下是作业内容:


-- source
CREATE TABLE mysql_old_order_table
(
order_number BIGINT,
priceDECIMAL,
order_time   TIMESTAMP(3),
PRIMARY KEY (order_number) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'database-name' = 'flink-test',
'table-name' = 'old_order'
);

-- sink
CREATE TABLE kafka_order_table
(
order_number BIGINT,
priceDECIMAL,
order_time   TIMESTAMP(3),
PRIMARY KEY (order_number) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'order',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'json',
'value.format' = 'json'
);

-- insert
INSERT INTO kafka_order_table SELECT * FROM mysql_old_order_table;







-- source
CREATE TABLE kafka_order_table
(
order_number BIGINT,
priceDECIMAL,
order_time   TIMESTAMP(3),
PRIMARY KEY (order_number) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'order',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'json',
'value.format' = 'json'
);

-- sink
CREATE TABLE mysql_order_table
(
order_number BIGINT,
priceDECIMAL,
order_time   TIMESTAMP(3),
PRIMARY KEY (order_number) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/flink-test',
'table-name' = 'order',
'username' = 'root',
'password' = 'root',
'sink.buffer-flush.max-rows' = '3',
'sink.buffer-flush.interval' = '1s'
);

-- insert
INSERT INTO mysql_order_table SELECT * FROM kafka_order_table;





在 2021-06-08 19:49:40,"Leonard Xu"  写道:
>试着回答下这两个问题。
>
>> flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc 
>> connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同步到目标表需要花费不少时间,而新的业务数据也源源不断进来,导致追不上。这要如何解决?
>是的,关键问题是cdc connector为了保证数据一致性只能单并发,所以作业也只能单并发。这个需要cdc 
>connector支持多并发读取,下游sink自然就能解决。
>
>
>> flink 1.13的jdbc connector新增 sink.parallism 
>> 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么?
>
>这个不仅在同步场景,在其他场景也需要注意 sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 
>sink 上定义的pk 和上游数据shuffle的key(比如 group key, join key)保持一致,
>否则可能导致数据乱序。 这个社区也在从 plan 推导上并解决,可以参考 
>https://issues.apache.org/jira/browse/FLINK-20374 
><https://issues.apache.org/jira/browse/FLINK-20374> 
>https://issues.apache.org/jira/browse/FLINK-22901 
><https://issues.apache.org/jira/browse/FLINK-22901> 
>
>祝好,
>Leonard


Re:Re: flink sql cdc数据同步至mysql

2021-06-10 Thread casel.chen



针对现在flink sql cdc下游并行度无法修改问题,是否可以分两步实现?谢谢!
1. flink sql cdc发到下游kafka,通过 upsert kafka connector,以debezium或canal格式,kafka 
topic开多个分区
2. 再从kafka消费,通过flink sql同步到最终mysql库














在 2021-06-08 19:49:40,"Leonard Xu"  写道:
>试着回答下这两个问题。
>
>> flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc 
>> connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同步到目标表需要花费不少时间,而新的业务数据也源源不断进来,导致追不上。这要如何解决?
>是的,关键问题是cdc connector为了保证数据一致性只能单并发,所以作业也只能单并发。这个需要cdc 
>connector支持多并发读取,下游sink自然就能解决。
>
>
>> flink 1.13的jdbc connector新增 sink.parallism 
>> 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么?
>
>这个不仅在同步场景,在其他场景也需要注意 sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 
>sink 上定义的pk 和上游数据shuffle的key(比如 group key, join key)保持一致,
>否则可能导致数据乱序。 这个社区也在从 plan 推导上并解决,可以参考 
>https://issues.apache.org/jira/browse/FLINK-20374 
><https://issues.apache.org/jira/browse/FLINK-20374> 
>https://issues.apache.org/jira/browse/FLINK-22901 
><https://issues.apache.org/jira/browse/FLINK-22901> 
>
>祝好,
>Leonard


flink sql cdc支持额外字段问题

2021-06-10 Thread casel.chen
flink sql 
cdc写入kafka,期望kafka消息带上数据库database,表table,变更时间和变更标记+I/-U/+U/-D这几个特殊字段,目前来看是做不到的,对吗?

Re: flink sql cdc数据同步至mysql

2021-06-08 Thread Leonard Xu
试着回答下这两个问题。

> flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc 
> connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同步到目标表需要花费不少时间,而新的业务数据也源源不断进来,导致追不上。这要如何解决?
是的,关键问题是cdc connector为了保证数据一致性只能单并发,所以作业也只能单并发。这个需要cdc 
connector支持多并发读取,下游sink自然就能解决。


> flink 1.13的jdbc connector新增 sink.parallism 
> 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么?

这个不仅在同步场景,在其他场景也需要注意 sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 
sink 上定义的pk 和上游数据shuffle的key(比如 group key, join key)保持一致,
否则可能导致数据乱序。 这个社区也在从 plan 推导上并解决,可以参考 
https://issues.apache.org/jira/browse/FLINK-20374 
 
https://issues.apache.org/jira/browse/FLINK-22901 
 

祝好,
Leonard

flink sql cdc数据同步至mysql

2021-06-08 Thread casel.chen
flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc 
connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同步到目标表需要花费不少时间,而新的业务数据也源源不断进来,导致追不上。这要如何解决?
flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink 
task呢?SQL正确的写法是什么?

flink sql cdc作数据同步作业数太多

2021-06-06 Thread casel.chen
flink sql cdc作数据同步,因为是基于库+表级别的,表数量太多导致作业数太多。请问能否用flink sql 
cdc基于库级别同步?这样作业数量会少很多。

mysql主从切换导致通过flink mysql-cdc消费binlog 点位出错

2021-06-03 Thread 董建
由于各种原因,dba进行了数据库主从切换。
目前我采用flink mysql-cdc采集binlog,但是数据库主从切换后,导致binlog的pos不一致。
flink 程序会自动重启,在经过配置的重启策略后就会挂掉,日志打印
org.apache.kafka.connect.errors.ConnectException: The connector is trying to 
read binlog starting at GTIDs 3fa7d5bb-65f3-11eb-9413-b0262879b560:1-730774004 
and binlog file 'mysql-bin.000650', pos=521310219, skipping 2 events plus 1 
rows, but this is no longer available on the server. Reconfigure the connector 
to use a snapshot when needed.


由于pos=521310219在新的数据库服务器上位置不对,flink最后一次自动保存的checkpoint已经存储了pos=521310219,导致通过flink
 -s  的方式无法接着继续消费,并且job无法成功启动。
不知道大家有什么好的办法解决这个问题?





怎么避免flink sql cdc作业重启后重新从头开始消费binlog?

2021-06-02 Thread casel.chen
我有一个如下flink sql cdc作业,设置了'scan.startup.mode' = 
'latest-offset'。但在作业重启后发现它又从头开始消费binlog,导致sink下游数据库频繁报duplicate key 
error,有什么办法可以避免吗?


CREATE TABLE `mysql_source` (
`id` STRING,
`acct_id` STRING,
`acct_name` STRING,
`acct_type` STRING,
`acct_bal` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'mysql',
'password' = 'mysql',
'database-name' = 'test',
'scan.startup.mode' = 'latest-offset',
'table-name' = 'test',
'server-time-zone' = 'Asia/Shanghai'
);

flink sql cdc 采集mysql binlog 可以保留before,after的字段吗

2021-05-31 Thread 董建
flink sql cdc 采集mysql binlog 可以保留before,after的字段吗?
按照官方的例子,定义表结构后,是最新的字段值?
能否同时保留before和after?

Re: Re:Re: flink sql cdc并行度问题

2021-05-28 Thread Zorro
如果你是自己实现MongoDB sink的话,你描述的方法看起来是可行的,不过这种实现方式相对比较复杂。

sql keyby可以考虑使用flink提供的 Deduplication

 
功能。这样的话MongoDB sink就可以开多个并行度,而不用考虑不同key的顺序问题了



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re: flink sql cdc并行度问题

2021-05-27 Thread casel.chen
我的作业是用flink sql消费mysql cdc 
binlog并实时同步到mongodb。如果只开一个并行度的话,mongodb的写入速度可能追不上mysql的写入。所以我需要在sink端开大并行度。
我不清楚用sql怎么写keyBy,是不是要group by 
pk呢?我原来的想法是在MongoDBSinkFunction中开一个线程池,每个线程对应下游sink的一个并行度,每个线程带一个queue,MongoDBSinkFunction根据数据PK往对应的queue发数据,每个消费者线程从自己的queue
 pull数据再进行批量插入。不知道这样可不可行?

















在 2021-05-26 14:22:11,"Zorro"  写道:
>mysql-cdc connector只能设置一个并行度,主要可能有这些原因:
>1. mysql binlog本质上是一个文件,多个并行度消费需要避免重复
>2. 多个并行度消费难以保证顺序
>
>sink可以设为多个并行度,但是顺序不一定,如果需要主键相同的记录发到同一个sink线程可以先做一个keyby,并且保证keyby并行度与sink并行度相同,这样基本上能够保证数据forward传输,不过也不能100%保证有序。
>
>如果需要保证有序还是建议sink并行度为1
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql cdc并行度问题

2021-05-26 Thread Zorro
mysql-cdc connector只能设置一个并行度,主要可能有这些原因:
1. mysql binlog本质上是一个文件,多个并行度消费需要避免重复
2. 多个并行度消费难以保证顺序

sink可以设为多个并行度,但是顺序不一定,如果需要主键相同的记录发到同一个sink线程可以先做一个keyby,并且保证keyby并行度与sink并行度相同,这样基本上能够保证数据forward传输,不过也不能100%保证有序。

如果需要保证有序还是建议sink并行度为1



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql cdc并行度问题

2021-05-24 Thread casel.chen
flink sql作业:消费mysql binlog将数据同步到 mongodb
问题:
1. mysql-cdc connector只能设置成一个并行度吗?
2. 可以增大mongodb的sink并行度吗?可以的话,要如何设置?它保证主键相同的记录会发到同一个分区sink吗?

Re: flink mysql cdc支持mysql的json格式吗?

2021-05-23 Thread hk__lrzy
https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-basic-types
支持



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink mysql cdc支持mysql的json格式吗?

2021-05-18 Thread 董建
flink mysql cdc支持mysql的json格式吗?

flink mysql cdc????

2021-04-22 Thread ????
??flink mysql cdc
1.flink mysql 
cdc??mysql??binlog??mysql
 

??????flink sql cdc????kafka????????????????????

2021-04-22 Thread ????
flink-cdcSourceRecord??SourceRecord??topic??
??Debezium 
mysql-conectorkafka-connectortopic??
?? ??+??+topic?? 
SourceRecord??topic



----
??: 
   "user-zh"



Re:回复:flink sql cdc发到kafka消息表名信息缺失问题

2021-04-22 Thread casel.chen



我的疑问正是flink cdc集成debezium后为何会把原始信息弄丢失了?直接采用原生的debezium或者canal同步数据固然可以。但如果flink 
cdc直接能发出来的话不就可以节省这些组件和运维么?flink cdc设计的初衷也是如此。













在 2021-04-22 11:01:22,"飞翔"  写道:

既然这样,为何要用flink去同步信息,把信息的原始信息都丢失了。你可以直接采用原生的debezium或者canal同步数据,发送kafka,
比如canal的样例,虽然after 
不是很全,你可以自己去构造补全,这样你采用debezium不就好了,也就是flink-cdc为什么集成debezium的原因,更新前后都是一个完整的record





-- 原始邮件 --
发件人: "user-zh" ;
发送时间: 2021年4月22日(星期四) 上午9:41
收件人: "user-zh@flink.apache.org";
主题: flink sql cdc发到kafka消息表名信息缺失问题


最近有个需求是用flink对接mysql binlog获取变更记录发到下游kafka(类似于canal 
server或debezium功能),下游kafka消息需要有 before, after, op_type, ts, database, table 
这些字段信息。我试了如下脚本发现出来的kafka消息只有data和op_type信息,其他信息都获取不到。追踪到上游debezium(flink 
cdc是基于debezium实现的)发出来的record本身就只带data和op_type信息,问一下有没有别的办法获取到变更原始记录呢?


CREATE TABLE `binlog_table` (
`id` INT,
`name` STRING,
`sys_id` STRING,
`sequence` INT,
`filter` STRING,
`tag` STRING,
`remark` STRING,
`create_date` TIMESTAMP,
`update_date` TIMESTAMP,
`reserve` STRING,
`sys_name` STRING,
`metric_seq` INT,
`advanced_function` STRING,
`value_type` STRING,
`value_field` STRING,
`status` INT,
`syn_date` TIMESTAMP,
`confirmer` STRING,
`confirm_time` TIMESTAMP,
`index_explain` STRING,
`field_name` STRING,
`tag_values` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '${mysql.hostname}',
  'port' = '3306',
  'username' = '${mysql.username}',
  'password' = '${mysql.password}',
  'database-name' = '${mysql.database}',
  'table-name' = '${mysql.table}'
  );


CREATE TABLE `kafka_sink` (
  `id` INT,
  `name` STRING,
  `sys_id` STRING,
  `sequence` INT,
  `filter` STRING,
  `tag` STRING,
  `remark` STRING,
  `create_date` TIMESTAMP,
  `update_date` TIMESTAMP,
  `reserve` STRING,
  `sys_name` STRING,
  `metric_seq` INT,
  `advanced_function` STRING,
  `value_type` STRING,
  `value_field` STRING,
  `status` INT,
  `syn_date` TIMESTAMP,
  `confirmer` STRING,
  `confirm_time` TIMESTAMP,
  `index_explain` STRING,
  `field_name` STRING,
  `tag_values` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = '${topic}',
  'properties.bootstrap.servers' = '${bootstrap.servers}',
  'format' = 'canal-json'
  );


INSERT INTO `kafka_sink`
(SELECT *
 FROM `binlog_table`);







??????flink sql cdc????kafka????????????????????

2021-04-21 Thread ????
??flink??debeziumcanal??kafka,
canalafter 
??debeziumflink-cdc??debezium??record





----
??: 
   "user-zh"



flink sql cdc发到kafka消息表名信息缺失问题

2021-04-21 Thread casel.chen
最近有个需求是用flink对接mysql binlog获取变更记录发到下游kafka(类似于canal 
server或debezium功能),下游kafka消息需要有 before, after, op_type, ts, database, table 
这些字段信息。我试了如下脚本发现出来的kafka消息只有data和op_type信息,其他信息都获取不到。追踪到上游debezium(flink 
cdc是基于debezium实现的)发出来的record本身就只带data和op_type信息,问一下有没有别的办法获取到变更原始记录呢?


CREATE TABLE `binlog_table` (
`id` INT,
`name` STRING,
`sys_id` STRING,
`sequence` INT,
`filter` STRING,
`tag` STRING,
`remark` STRING,
`create_date` TIMESTAMP,
`update_date` TIMESTAMP,
`reserve` STRING,
`sys_name` STRING,
`metric_seq` INT,
`advanced_function` STRING,
`value_type` STRING,
`value_field` STRING,
`status` INT,
`syn_date` TIMESTAMP,
`confirmer` STRING,
`confirm_time` TIMESTAMP,
`index_explain` STRING,
`field_name` STRING,
`tag_values` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '${mysql.hostname}',
  'port' = '3306',
  'username' = '${mysql.username}',
  'password' = '${mysql.password}',
  'database-name' = '${mysql.database}',
  'table-name' = '${mysql.table}'
  );


CREATE TABLE `kafka_sink` (
  `id` INT,
  `name` STRING,
  `sys_id` STRING,
  `sequence` INT,
  `filter` STRING,
  `tag` STRING,
  `remark` STRING,
  `create_date` TIMESTAMP,
  `update_date` TIMESTAMP,
  `reserve` STRING,
  `sys_name` STRING,
  `metric_seq` INT,
  `advanced_function` STRING,
  `value_type` STRING,
  `value_field` STRING,
  `status` INT,
  `syn_date` TIMESTAMP,
  `confirmer` STRING,
  `confirm_time` TIMESTAMP,
  `index_explain` STRING,
  `field_name` STRING,
  `tag_values` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = '${topic}',
  'properties.bootstrap.servers' = '${bootstrap.servers}',
  'format' = 'canal-json'
  );


INSERT INTO `kafka_sink`
(SELECT *
 FROM `binlog_table`);







flink sql cdc流(1.11.2),双流join长时间运行后,试着取消job后,再从从checkpoint处恢复运行,报outofmemorry, tm的参数配置也没有变过,rocksdb后端。 这个可能原因是啥?运行的时候没报,为啥恢复的时间报了?

2020-12-23 Thread jindy_liu


java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3181)
at java.util.ArrayList.grow(ArrayList.java:261)
at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:235)
at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:227)
at java.util.ArrayList.add(ArrayList.java:458)
at
org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator$AssociatedRecords.of(AbstractStreamingJoinOperator.java:212)
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:199)
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement2(StreamingJoinOperator.java:120)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord2(StreamTwoInputProcessor.java:142)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$1(StreamTwoInputProcessor.java:105)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$345/1080469422.accept(Unknown
Source)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:364)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:182)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$199/62611056.runDefaultAction(Unknown
Source)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:745)





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: flink sql cdc sum 结果出现NULL

2020-11-30 Thread kandy.wang









@Jianzhi Zhang

嗯,是这个原因,感谢 回复。 就是decimal的精度问题




在 2020-12-01 13:24:23,"Jianzhi Zhang"  写道:
>是不是你的decimal字段长度太短了,计算结果超出了精度范围导致null的出现
>
>> 2020年11月19日 下午10:41,kandy.wang  写道:
>> 
>> --mysql表
>> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>>   `id` INT UNSIGNED AUTO_INCREMENT,
>>   `spu_id` BIGINT NOT NULL,
>>   `leaving_price`  DECIMAL(10, 5)
>>   PRIMARY KEY ( `id` ),
>>   unique key idx_spu_id (spu_id)
>> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>> 
>> 
>> --flink表
>> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
>>   `spu_id` BIGINT ,
>>   `leaving_price`  DECIMAL(10, 5),
>>PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> ) WITH (
>>  'connector' = 'jdbc',
>>   'url' = 'jdbc:mysql://...',
>>   'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>>   'username' = '...',
>>   'password' = '..'
>> );
>> 
>> 
>> --binlog 2mysql
>> 
>> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> 
>> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>> 
>> FROM hive.database.table
>> 
>> group by v_spu_id;
>> 
>> 
>> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>> 
>> 
>> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price 
>> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> 有什么好的排查思路么?
>> 
>> 
>> 
>> 
>> 


Re: flink sql cdc sum 结果出现NULL

2020-11-30 Thread Jianzhi Zhang
是不是你的decimal字段长度太短了,计算结果超出了精度范围导致null的出现

> 2020年11月19日 下午10:41,kandy.wang  写道:
> 
> --mysql表
> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>   `id` INT UNSIGNED AUTO_INCREMENT,
>   `spu_id` BIGINT NOT NULL,
>   `leaving_price`  DECIMAL(10, 5)
>   PRIMARY KEY ( `id` ),
>   unique key idx_spu_id (spu_id)
> )ENGINE=InnoDB DEFAULT CHARSET=utf8
> 
> 
> --flink表
> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
>   `spu_id` BIGINT ,
>   `leaving_price`  DECIMAL(10, 5),
>PRIMARY KEY ( `spu_id`) NOT ENFORCED
> ) WITH (
>  'connector' = 'jdbc',
>   'url' = 'jdbc:mysql://...',
>   'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>   'username' = '...',
>   'password' = '..'
> );
> 
> 
> --binlog 2mysql
> 
> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
> 
> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
> 
> FROM hive.database.table
> 
> group by v_spu_id;
> 
> 
> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
> 
> 
> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price 
> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
> 有什么好的排查思路么?
> 
> 
> 
> 
> 



flink sql cdc做计算后, elastic search connector 多并发sink 会有问题,偶现!数据量较大,目前还没较好复现。请问有人是否也遇到过类似的问题。

2020-11-30 Thread jindy_liu
flink 版本: 1.11.2

*
Caused by: [rt_caliber_1000/Rn_SXw45Qk2FY8ujAMpmmQ][[rt_caliber_1000][1]]
ElasticsearchException[Elasticsearch exception
[type=version_conflict_engine_exception, reason=[64_40108_0_1]: version
conflict, required seqNo [95958], primary term [1]. current document has
seqNo [99881] and primary term [1]]]*


完整信息:

2020-11-13 11:07:04
java.lang.RuntimeException: An error occurred in ElasticsearchSink.
at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:383)
at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:388)
at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:309)
at
org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:86)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:745)
Caused by: [rt_caliber_1000/Rn_SXw45Qk2FY8ujAMpmmQ][[rt_caliber_1000][1]]
ElasticsearchException[Elasticsearch exception
[type=version_conflict_engine_exception, reason=[64_40108_0_1]: version
conflict, required seqNo [95958], primary term [1]. current document has
seqNo [99881] and primary term [1]]]
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:496)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:407)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:138)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:196)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1793)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$10(RestHighLevelClient.java:1581)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1663)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:590)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:333)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:327)
at
org.apache.flink.elasticsearch7.shaded.org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:181)
at
org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
at
org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
at

Re: Re: flink sql cdc 写数据到mysql,找不到相关的类

2020-11-27 Thread cljb...@163.com
感谢回复!
刚才找到问题了,从maven官网拷贝过来的 pom依赖,  scope被设置成 test了。。。改成compile就好了



cljb...@163.com
 
发件人: Jark Wu
发送时间: 2020-11-27 19:14
收件人: user-zh
主题: Re: flink sql cdc 写数据到mysql,找不到相关的类
估计是你的 flink-json 和框架已经打包进去的 flink-json 冲突了,可能是你加进去的 flink-json 版本不是 1.11.x ?
 
On Fri, 27 Nov 2020 at 19:03, cljb...@163.com  wrote:
 
> 相关的依赖以及添加,不知道如下问题是如何导致,求解!
> 已添加的依赖有:
> flink-connector-mysql-cdc
> flink-format-changelog-json
> flink-json
>
> 报错信息如下:
>
> java.util.ServiceConfigurationError:
> org.apache.flink.table.factories.Factory: Provider
> com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory could not
> be instantiated
> at java.util.ServiceLoader.fail(ServiceLoader.java:232)
> at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
> at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
> at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
> at java.util.Iterator.forEachRemaining(Iterator.java:116)
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:342)
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:221)
> at
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> at com.jfbank.searchrec.main.XfkOsProducts.main(XfkOsProducts.java:31)
> Caused by: java.lang.NoClassDefFoundError:
> org/apache/flink/formats/json/JsonOptions
> at
> com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory.(ChangelogJsonFormatFactory.java:53)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at java.lang.Class.newInstance(Class.java:442)
> at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
> ... 25 common frames omitted
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.formats.json.JsonOptions
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 32 common frames omitted
>
>
>
> cljb...@163.com
>


Re: flink sql cdc 写数据到mysql,找不到相关的类

2020-11-27 Thread Jark Wu
估计是你的 flink-json 和框架已经打包进去的 flink-json 冲突了,可能是你加进去的 flink-json 版本不是 1.11.x ?

On Fri, 27 Nov 2020 at 19:03, cljb...@163.com  wrote:

> 相关的依赖以及添加,不知道如下问题是如何导致,求解!
> 已添加的依赖有:
> flink-connector-mysql-cdc
> flink-format-changelog-json
> flink-json
>
> 报错信息如下:
>
> java.util.ServiceConfigurationError:
> org.apache.flink.table.factories.Factory: Provider
> com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory could not
> be instantiated
> at java.util.ServiceLoader.fail(ServiceLoader.java:232)
> at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
> at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
> at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
> at java.util.Iterator.forEachRemaining(Iterator.java:116)
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:342)
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:221)
> at
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> at com.jfbank.searchrec.main.XfkOsProducts.main(XfkOsProducts.java:31)
> Caused by: java.lang.NoClassDefFoundError:
> org/apache/flink/formats/json/JsonOptions
> at
> com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory.(ChangelogJsonFormatFactory.java:53)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at java.lang.Class.newInstance(Class.java:442)
> at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
> ... 25 common frames omitted
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.formats.json.JsonOptions
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 32 common frames omitted
>
>
>
> cljb...@163.com
>


flink sql cdc 写数据到mysql,找不到相关的类

2020-11-27 Thread cljb...@163.com
相关的依赖以及添加,不知道如下问题是如何导致,求解!
已添加的依赖有:
flink-connector-mysql-cdc
flink-format-changelog-json
flink-json

报错信息如下:

java.util.ServiceConfigurationError: org.apache.flink.table.factories.Factory: 
Provider com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory 
could not be instantiated
at java.util.ServiceLoader.fail(ServiceLoader.java:232)
at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at 
org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:342)
at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:221)
at 
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
at com.jfbank.searchrec.main.XfkOsProducts.main(XfkOsProducts.java:31)
Caused by: java.lang.NoClassDefFoundError: 
org/apache/flink/formats/json/JsonOptions
at 
com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory.(ChangelogJsonFormatFactory.java:53)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
... 25 common frames omitted
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.formats.json.JsonOptions
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 32 common frames omitted



cljb...@163.com


Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-27 Thread jindy_liu
谢谢jark!这几天一直在做性能调优!
1、这里针对这个简单场景目前可以在sink表的test_status表的primary key,增加一个join
key。即id和status两个列作为key,这样能使用数据最终一致,算是做了下规避,能一致。复杂点的语句感觉有点难搞,有点不敢用,主要不清楚这个乱序会对其它算子有什么影响,很容易出错,确实应该在flink框架里搞了合适些。这里jark在使用flink
sql cdc方面有啥建议吗?

2、关于性能这块,确实flink的rocksdb默认参数,性能很差!
按你给的文章,调了些参数,同时换了ssd硬盘后,write_buffer,buffter_size,能有很好的提升。我说之前怎么并行度提高了,cpu感觉总是跑不满,在等io了。感觉这里提升空间还有很大,还没摸到窍门,哪个参数会比较好。

3、另外,性能监控方面,flink的web
ui上的metric有点难用,有没有一些prometheus+grafana的最佳实践的?指标有点多,dashboard搞起来很麻烦,
主要是要有dashboard的配置!




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql cdc 如果只处理一次全量数据问题

2020-11-26 Thread 俞剑波
你说的有变化是后续的数据库进行增删改操作吗,如果是的话你从checkpoint启动就好了啊

cljb...@163.com  于2020年11月27日周五 上午11:10写道:

> 之前一直使用streaming api,这两天开始使用sql。
> 有个疑问,flink sql  cdc读取mysql的数据时候,会处理 全量 + 增量数据。
> 那么如果同一个任务上线后,后续有变化,修改后再次上线,这个时候我并不希望处理之前过的数据。这个时候是怎么做呢?
>
>
> cdc里面有进行state保存消费过的changelog的位置吗?这样我重新上线的时候从savepoint或者checkpoint进行恢复,是不是就可以了?
>
> 感谢!
>
>
> cljb...@163.com
>


flink sql cdc 如果只处理一次全量数据问题

2020-11-26 Thread cljb...@163.com
之前一直使用streaming api,这两天开始使用sql。
有个疑问,flink sql  cdc读取mysql的数据时候,会处理 全量 + 增量数据。
那么如果同一个任务上线后,后续有变化,修改后再次上线,这个时候我并不希望处理之前过的数据。这个时候是怎么做呢?

cdc里面有进行state保存消费过的changelog的位置吗?这样我重新上线的时候从savepoint或者checkpoint进行恢复,是不是就可以了?

感谢!


cljb...@163.com


Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-26 Thread Jark Wu
Btw, I created an issue to track this problem:
https://issues.apache.org/jira/browse/FLINK-20374
Hope we can fix it in the next versions to have a better out-of-box
experience.

Best,
Jark

On Thu, 19 Nov 2020 at 13:58, Jark Wu  wrote:

> 如果数据本身没什么倾斜,且并发也能打上去。那在 sql 这边也没什么其他办法了。得从 rocksdb 的角度去调优看看。比如:
> 1. 是否有使用 SSD?
> 2. 调整 write buffer 和 block cache
> 3. 更多可以看下这些 state 调优文章[1][2].
>
> Best,
> Jark
>
> [1]: https://mp.weixin.qq.com/s/r0iPPGWceWkT1OeBJjvJGg
> [2]: https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA
>
> On Thu, 19 Nov 2020 at 12:19, jindy_liu <286729...@qq.com> wrote:
>
>> 很感谢jark!
>> 1、昨天将status表设置成时态表(Temporal
>> Tables),然后连续join试了下。确实有你说的问题,status表的更新不会触发任务计算,所有的数据实时变更需要test流来驱动。
>>
>> 同时时态表TTL设置问题,太小i/o有问题,太大结果不及时,与应用场景要求不符合,主要我们的场景下,status表也并不是维表,并且也数据量也大,变化也多。
>>
>> 2、并发度1的话,可以预见的是有主要性能问题,表大的情况下,join导致的反压厉害。
>>
>> 3、因为多并发度(10,20,40,80)测试中,我将join的两个表(test,
>> status)的数据完全对称,没有倾斜问题(200w,200w,并且join的key对称,test 1 -- status1, test 2 --
>> status2, test 200 -- status200),source仍然存在着反压,只是并发度高的反压慢点出现一些,
>> 这里的flink state后端用的是rokcsdb+本地文件。磁盘i/o看也就在2w block/s的,难道是默认的rokcsdb配置性能不够?
>>
>> 这个数据反压上,jark你有啥建议吗?
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>
>


Re:Re: Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 Thread kandy.wang
1.没有初始的全量数据可能是会有问题的
这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据? 
我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。
2.先发的before 后发的after


3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition 
中。由于是按照主键id hash的








在 2020-11-20 13:25:53,"Jark Wu"  写道:
>1. 没有初始的全量数据可能是会有问题的。
>
>3. 你的 format 再解析 update 时,时先发的 before 还是 after?
>4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不?
>
>On Fri, 20 Nov 2020 at 12:46, kandy.wang  wrote:
>
>>
>>
>>
>>
>>
>>
>> 1.是的。  这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
>>
>> 2. 没有开启
>>
>>
>>
>>
>> 在 2020-11-20 11:49:44,"Jark Wu"  写道:
>> >实现上应该没什么问题。
>> >
>> >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
>> >2. 是否开启 mini-batch了?
>> >
>> >Best,
>> >Jark
>> >
>> >On Fri, 20 Nov 2020 at 11:44, kandy.wang  wrote:
>> >
>> >> hi Jark:
>> >>
>> >>
>> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
>> >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>> >>
>> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条
>> update_before
>> >> update_after,format逻辑是应该这么写的吧。
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-11-19 23:13:19,"Jark Wu"  写道:
>> >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>> >> >值的,以验证你的自定义 format 没有问题。
>> >> >
>> >> >Best,
>> >> >Jark
>> >> >
>> >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
>> >> >
>> >> >> --mysql表
>> >> >> CREATE TABLE IF NOT EXISTS
>> `mysql_realtime_leaving_price_spu_index_agg`(
>> >> >>`id` INT UNSIGNED AUTO_INCREMENT,
>> >> >>`spu_id` BIGINT NOT NULL,
>> >> >>`leaving_price`  DECIMAL(10, 5)
>> >> >>PRIMARY KEY ( `id` ),
>> >> >>unique key idx_spu_id (spu_id)
>> >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>> >> >>
>> >> >>
>> >> >> --flink表
>> >> >> CREATE TABLE
>> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >> (
>> >> >>`spu_id` BIGINT ,
>> >> >>`leaving_price`  DECIMAL(10, 5),
>> >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> >> >> ) WITH (
>> >> >>   'connector' = 'jdbc',
>> >> >>'url' = 'jdbc:mysql://...',
>> >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>> >> >>'username' = '...',
>> >> >>'password' = '..'
>> >> >> );
>> >> >>
>> >> >>
>> >> >> --binlog 2mysql
>> >> >>
>> >> >> insert into
>> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >> >>
>> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>> >> >>
>> >> >> FROM hive.database.table
>> >> >>
>> >> >> group by v_spu_id;
>> >> >>
>> >> >>
>> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>> >> >>
>> >> >>
>> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> >> >> 有什么好的排查思路么?
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >>
>>


Re:Re: Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 Thread kandy.wang
1.没有初始的全量数据可能是会有问题的
这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据? 
我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。
2.先发的before 后发的after


3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition 
中。由于是按照主键id hash的。





在 2020-11-20 13:25:53,"Jark Wu"  写道:
>1. 没有初始的全量数据可能是会有问题的。
>
>3. 你的 format 再解析 update 时,时先发的 before 还是 after?
>4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不?
>
>On Fri, 20 Nov 2020 at 12:46, kandy.wang  wrote:
>
>>
>>
>>
>>
>>
>>
>> 1.是的。  这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
>>
>> 2. 没有开启
>>
>>
>>
>>
>> 在 2020-11-20 11:49:44,"Jark Wu"  写道:
>> >实现上应该没什么问题。
>> >
>> >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
>> >2. 是否开启 mini-batch了?
>> >
>> >Best,
>> >Jark
>> >
>> >On Fri, 20 Nov 2020 at 11:44, kandy.wang  wrote:
>> >
>> >> hi Jark:
>> >>
>> >>
>> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
>> >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>> >>
>> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条
>> update_before
>> >> update_after,format逻辑是应该这么写的吧。
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-11-19 23:13:19,"Jark Wu"  写道:
>> >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>> >> >值的,以验证你的自定义 format 没有问题。
>> >> >
>> >> >Best,
>> >> >Jark
>> >> >
>> >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
>> >> >
>> >> >> --mysql表
>> >> >> CREATE TABLE IF NOT EXISTS
>> `mysql_realtime_leaving_price_spu_index_agg`(
>> >> >>`id` INT UNSIGNED AUTO_INCREMENT,
>> >> >>`spu_id` BIGINT NOT NULL,
>> >> >>`leaving_price`  DECIMAL(10, 5)
>> >> >>PRIMARY KEY ( `id` ),
>> >> >>unique key idx_spu_id (spu_id)
>> >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>> >> >>
>> >> >>
>> >> >> --flink表
>> >> >> CREATE TABLE
>> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >> (
>> >> >>`spu_id` BIGINT ,
>> >> >>`leaving_price`  DECIMAL(10, 5),
>> >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> >> >> ) WITH (
>> >> >>   'connector' = 'jdbc',
>> >> >>'url' = 'jdbc:mysql://...',
>> >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>> >> >>'username' = '...',
>> >> >>'password' = '..'
>> >> >> );
>> >> >>
>> >> >>
>> >> >> --binlog 2mysql
>> >> >>
>> >> >> insert into
>> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >> >>
>> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>> >> >>
>> >> >> FROM hive.database.table
>> >> >>
>> >> >> group by v_spu_id;
>> >> >>
>> >> >>
>> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>> >> >>
>> >> >>
>> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> >> >> 有什么好的排查思路么?
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >>
>>


Re: Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 Thread Jark Wu
1. 没有初始的全量数据可能是会有问题的。

3. 你的 format 再解析 update 时,时先发的 before 还是 after?
4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不?

On Fri, 20 Nov 2020 at 12:46, kandy.wang  wrote:

>
>
>
>
>
>
> 1.是的。  这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
>
> 2. 没有开启
>
>
>
>
> 在 2020-11-20 11:49:44,"Jark Wu"  写道:
> >实现上应该没什么问题。
> >
> >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
> >2. 是否开启 mini-batch了?
> >
> >Best,
> >Jark
> >
> >On Fri, 20 Nov 2020 at 11:44, kandy.wang  wrote:
> >
> >> hi Jark:
> >>
> >>
> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
> >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
> >>
> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条
> update_before
> >> update_after,format逻辑是应该这么写的吧。
> >>
> >>
> >>
> >>
> >> 在 2020-11-19 23:13:19,"Jark Wu"  写道:
> >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
> >> >值的,以验证你的自定义 format 没有问题。
> >> >
> >> >Best,
> >> >Jark
> >> >
> >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
> >> >
> >> >> --mysql表
> >> >> CREATE TABLE IF NOT EXISTS
> `mysql_realtime_leaving_price_spu_index_agg`(
> >> >>`id` INT UNSIGNED AUTO_INCREMENT,
> >> >>`spu_id` BIGINT NOT NULL,
> >> >>`leaving_price`  DECIMAL(10, 5)
> >> >>PRIMARY KEY ( `id` ),
> >> >>unique key idx_spu_id (spu_id)
> >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
> >> >>
> >> >>
> >> >> --flink表
> >> >> CREATE TABLE
> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
> >> (
> >> >>`spu_id` BIGINT ,
> >> >>`leaving_price`  DECIMAL(10, 5),
> >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED
> >> >> ) WITH (
> >> >>   'connector' = 'jdbc',
> >> >>'url' = 'jdbc:mysql://...',
> >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
> >> >>'username' = '...',
> >> >>'password' = '..'
> >> >> );
> >> >>
> >> >>
> >> >> --binlog 2mysql
> >> >>
> >> >> insert into
> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
> >> >>
> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
> >> >>
> >> >> FROM hive.database.table
> >> >>
> >> >> group by v_spu_id;
> >> >>
> >> >>
> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
> >> >>
> >> >>
> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
> >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
> >> >> 有什么好的排查思路么?
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >>
>


Re:Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 Thread kandy.wang






1.是的。  这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。

2. 没有开启




在 2020-11-20 11:49:44,"Jark Wu"  写道:
>实现上应该没什么问题。
>
>1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
>2. 是否开启 mini-batch了?
>
>Best,
>Jark
>
>On Fri, 20 Nov 2020 at 11:44, kandy.wang  wrote:
>
>> hi Jark:
>>
>>
>> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
>> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>>
>> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before
>> update_after,format逻辑是应该这么写的吧。
>>
>>
>>
>>
>> 在 2020-11-19 23:13:19,"Jark Wu"  写道:
>> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>> >值的,以验证你的自定义 format 没有问题。
>> >
>> >Best,
>> >Jark
>> >
>> >On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
>> >
>> >> --mysql表
>> >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>> >>`id` INT UNSIGNED AUTO_INCREMENT,
>> >>`spu_id` BIGINT NOT NULL,
>> >>`leaving_price`  DECIMAL(10, 5)
>> >>PRIMARY KEY ( `id` ),
>> >>unique key idx_spu_id (spu_id)
>> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>> >>
>> >>
>> >> --flink表
>> >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> (
>> >>`spu_id` BIGINT ,
>> >>`leaving_price`  DECIMAL(10, 5),
>> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> >> ) WITH (
>> >>   'connector' = 'jdbc',
>> >>'url' = 'jdbc:mysql://...',
>> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>> >>'username' = '...',
>> >>'password' = '..'
>> >> );
>> >>
>> >>
>> >> --binlog 2mysql
>> >>
>> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >>
>> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>> >>
>> >> FROM hive.database.table
>> >>
>> >> group by v_spu_id;
>> >>
>> >>
>> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>> >>
>> >>
>> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> >> 有什么好的排查思路么?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>>


Re:Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 Thread kandy.wang
1.是的。  这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。

2. 没有开启


在 2020-11-20 11:49:44,"Jark Wu"  写道:
>实现上应该没什么问题。
>
>1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
>2. 是否开启 mini-batch了?
>
>Best,
>Jark
>
>On Fri, 20 Nov 2020 at 11:44, kandy.wang  wrote:
>
>> hi Jark:
>>
>>
>> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
>> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>>
>> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before
>> update_after,format逻辑是应该这么写的吧。
>>
>>
>>
>>
>> 在 2020-11-19 23:13:19,"Jark Wu"  写道:
>> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>> >值的,以验证你的自定义 format 没有问题。
>> >
>> >Best,
>> >Jark
>> >
>> >On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
>> >
>> >> --mysql表
>> >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>> >>`id` INT UNSIGNED AUTO_INCREMENT,
>> >>`spu_id` BIGINT NOT NULL,
>> >>`leaving_price`  DECIMAL(10, 5)
>> >>PRIMARY KEY ( `id` ),
>> >>unique key idx_spu_id (spu_id)
>> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>> >>
>> >>
>> >> --flink表
>> >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> (
>> >>`spu_id` BIGINT ,
>> >>`leaving_price`  DECIMAL(10, 5),
>> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> >> ) WITH (
>> >>   'connector' = 'jdbc',
>> >>'url' = 'jdbc:mysql://...',
>> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>> >>'username' = '...',
>> >>'password' = '..'
>> >> );
>> >>
>> >>
>> >> --binlog 2mysql
>> >>
>> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >>
>> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>> >>
>> >> FROM hive.database.table
>> >>
>> >> group by v_spu_id;
>> >>
>> >>
>> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>> >>
>> >>
>> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> >> 有什么好的排查思路么?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>>


Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 Thread Jark Wu
实现上应该没什么问题。

1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
2. 是否开启 mini-batch了?

Best,
Jark

On Fri, 20 Nov 2020 at 11:44, kandy.wang  wrote:

> hi Jark:
>
>
> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>
> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before
> update_after,format逻辑是应该这么写的吧。
>
>
>
>
> 在 2020-11-19 23:13:19,"Jark Wu"  写道:
> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
> >值的,以验证你的自定义 format 没有问题。
> >
> >Best,
> >Jark
> >
> >On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
> >
> >> --mysql表
> >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
> >>`id` INT UNSIGNED AUTO_INCREMENT,
> >>`spu_id` BIGINT NOT NULL,
> >>`leaving_price`  DECIMAL(10, 5)
> >>PRIMARY KEY ( `id` ),
> >>unique key idx_spu_id (spu_id)
> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
> >>
> >>
> >> --flink表
> >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
> (
> >>`spu_id` BIGINT ,
> >>`leaving_price`  DECIMAL(10, 5),
> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED
> >> ) WITH (
> >>   'connector' = 'jdbc',
> >>'url' = 'jdbc:mysql://...',
> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
> >>'username' = '...',
> >>'password' = '..'
> >> );
> >>
> >>
> >> --binlog 2mysql
> >>
> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
> >>
> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
> >>
> >> FROM hive.database.table
> >>
> >> group by v_spu_id;
> >>
> >>
> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
> >>
> >>
> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
> >> 有什么好的排查思路么?
> >>
> >>
> >>
> >>
> >>
> >>
>


Re:Re: flink sql cdc sum 结果出现NULL

2020-11-19 Thread kandy.wang
hi Jark:


打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price 
都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况

自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before 
update_after,format逻辑是应该这么写的吧。




在 2020-11-19 23:13:19,"Jark Wu"  写道:
>你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>值的,以验证你的自定义 format 没有问题。
>
>Best,
>Jark
>
>On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
>
>> --mysql表
>> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>>`id` INT UNSIGNED AUTO_INCREMENT,
>>`spu_id` BIGINT NOT NULL,
>>`leaving_price`  DECIMAL(10, 5)
>>PRIMARY KEY ( `id` ),
>>unique key idx_spu_id (spu_id)
>> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>>
>>
>> --flink表
>> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
>>`spu_id` BIGINT ,
>>`leaving_price`  DECIMAL(10, 5),
>> PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'jdbc',
>>'url' = 'jdbc:mysql://...',
>>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>>'username' = '...',
>>'password' = '..'
>> );
>>
>>
>> --binlog 2mysql
>>
>> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>>
>> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>>
>> FROM hive.database.table
>>
>> group by v_spu_id;
>>
>>
>> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>>
>>
>> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> 有什么好的排查思路么?
>>
>>
>>
>>
>>
>>


Re:Re: flink sql cdc sum 结果出现NULL

2020-11-19 Thread kandy.wang
hi Jark:

打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price 
都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before 
update_after,format逻辑是应该这么写的吧。




在 2020-11-19 23:13:19,"Jark Wu"  写道:
>你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>值的,以验证你的自定义 format 没有问题。
>
>Best,
>Jark
>
>On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
>
>> --mysql表
>> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>>`id` INT UNSIGNED AUTO_INCREMENT,
>>`spu_id` BIGINT NOT NULL,
>>`leaving_price`  DECIMAL(10, 5)
>>PRIMARY KEY ( `id` ),
>>unique key idx_spu_id (spu_id)
>> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>>
>>
>> --flink表
>> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
>>`spu_id` BIGINT ,
>>`leaving_price`  DECIMAL(10, 5),
>> PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'jdbc',
>>'url' = 'jdbc:mysql://...',
>>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>>'username' = '...',
>>'password' = '..'
>> );
>>
>>
>> --binlog 2mysql
>>
>> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>>
>> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>>
>> FROM hive.database.table
>>
>> group by v_spu_id;
>>
>>
>> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>>
>>
>> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> 有什么好的排查思路么?
>>
>>
>>
>>
>>
>>


Re: flink sql cdc sum 结果出现NULL

2020-11-19 Thread Jark Wu
你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
值的,以验证你的自定义 format 没有问题。

Best,
Jark

On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:

> --mysql表
> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>`id` INT UNSIGNED AUTO_INCREMENT,
>`spu_id` BIGINT NOT NULL,
>`leaving_price`  DECIMAL(10, 5)
>PRIMARY KEY ( `id` ),
>unique key idx_spu_id (spu_id)
> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>
>
> --flink表
> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
>`spu_id` BIGINT ,
>`leaving_price`  DECIMAL(10, 5),
> PRIMARY KEY ( `spu_id`) NOT ENFORCED
> ) WITH (
>   'connector' = 'jdbc',
>'url' = 'jdbc:mysql://...',
>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>'username' = '...',
>'password' = '..'
> );
>
>
> --binlog 2mysql
>
> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>
> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>
> FROM hive.database.table
>
> group by v_spu_id;
>
>
> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>
>
> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
> 有什么好的排查思路么?
>
>
>
>
>
>


flink sql cdc sum 结果出现NULL

2020-11-19 Thread kandy.wang
--mysql表
CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
   `id` INT UNSIGNED AUTO_INCREMENT,
   `spu_id` BIGINT NOT NULL,
   `leaving_price`  DECIMAL(10, 5)
   PRIMARY KEY ( `id` ),
   unique key idx_spu_id (spu_id)
)ENGINE=InnoDB DEFAULT CHARSET=utf8


--flink表
CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
   `spu_id` BIGINT ,
   `leaving_price`  DECIMAL(10, 5),
PRIMARY KEY ( `spu_id`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
   'url' = 'jdbc:mysql://...',
   'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
   'username' = '...',
   'password' = '..'
);


--binlog 2mysql

insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg

SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price

FROM hive.database.table

group by v_spu_id;


hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。


问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price 
字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
有什么好的排查思路么?







Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-18 Thread Jark Wu
如果数据本身没什么倾斜,且并发也能打上去。那在 sql 这边也没什么其他办法了。得从 rocksdb 的角度去调优看看。比如:
1. 是否有使用 SSD?
2. 调整 write buffer 和 block cache
3. 更多可以看下这些 state 调优文章[1][2].

Best,
Jark

[1]: https://mp.weixin.qq.com/s/r0iPPGWceWkT1OeBJjvJGg
[2]: https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA

On Thu, 19 Nov 2020 at 12:19, jindy_liu <286729...@qq.com> wrote:

> 很感谢jark!
> 1、昨天将status表设置成时态表(Temporal
> Tables),然后连续join试了下。确实有你说的问题,status表的更新不会触发任务计算,所有的数据实时变更需要test流来驱动。
>
> 同时时态表TTL设置问题,太小i/o有问题,太大结果不及时,与应用场景要求不符合,主要我们的场景下,status表也并不是维表,并且也数据量也大,变化也多。
>
> 2、并发度1的话,可以预见的是有主要性能问题,表大的情况下,join导致的反压厉害。
>
> 3、因为多并发度(10,20,40,80)测试中,我将join的两个表(test,
> status)的数据完全对称,没有倾斜问题(200w,200w,并且join的key对称,test 1 -- status1, test 2 --
> status2, test 200 -- status200),source仍然存在着反压,只是并发度高的反压慢点出现一些,
> 这里的flink state后端用的是rokcsdb+本地文件。磁盘i/o看也就在2w block/s的,难道是默认的rokcsdb配置性能不够?
>
> 这个数据反压上,jark你有啥建议吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-18 Thread jindy_liu
很感谢jark!
1、昨天将status表设置成时态表(Temporal
Tables),然后连续join试了下。确实有你说的问题,status表的更新不会触发任务计算,所有的数据实时变更需要test流来驱动。
同时时态表TTL设置问题,太小i/o有问题,太大结果不及时,与应用场景要求不符合,主要我们的场景下,status表也并不是维表,并且也数据量也大,变化也多。

2、并发度1的话,可以预见的是有主要性能问题,表大的情况下,join导致的反压厉害。

3、因为多并发度(10,20,40,80)测试中,我将join的两个表(test,
status)的数据完全对称,没有倾斜问题(200w,200w,并且join的key对称,test 1 -- status1, test 2 --
status2, test 200 -- status200),source仍然存在着反压,只是并发度高的反压慢点出现一些,
这里的flink state后端用的是rokcsdb+本地文件。磁盘i/o看也就在2w block/s的,难道是默认的rokcsdb配置性能不够?

这个数据反压上,jark你有啥建议吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-18 Thread Jark Wu
我再仔细看了下你的问题,你的 join key 是 status id,所以目前会按照 status id 做 shuffle key 分发给
join 的不同并发处理。
如果 test 表的 status id 发生变更的话,就会导致一个 test  id 的数据会被不同的 join 并发处理,也即 test
数据已经乱序了,
这时候,即使下游再加 keyby sink key,也无济于事了。

所以,如果双流 join 两个 cdc 流,要注意 join key 是不能发生变更的,否则只能 join 设置成单并发。
像你这个场景,可以考虑采用维表 join status 表,因为目前维表 join 不会按照 join key 做 shuffle,所以能保证即使
test 表数据不乱序。
但是 status 表的更新,就无法触发计算 更新到sink 表了,只有 test 表的更新 才会触发计算并更新到 sink 表。

Best,
Jark



On Mon, 16 Nov 2020 at 16:03, jindy_liu <286729...@qq.com> wrote:

> 1、试了下
>
> 在test表中增加一个proctime
>
> CREATE TABLE test (
> `id` INT,
> `name` VARCHAR(255),
> `time` TIMESTAMP(3),
> `status` INT,
> `proctime` AS PROCTIME(),
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
>   'connector' = 'mysql-cdc',
>   'hostname' = 'localhost',
>   'port' = '3306',
>   'username' = 'no_lock',
>   'password' = 'no_lock',
>   'database-name' = 'ai_audio_lyric_task',
>   'table-name' = 'test',
>   'debezium.snapshot.locking.mode' = 'none'
> );
>
> 写去重语句,
>
> INSERT into test_status_print
> SELECT r.id, r.name, r.`time`, r.`proctime`, r.status, r.status_name
> FROM (
> SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY proctime) as
> rowNum
> FROM (
> SELECT t.* , s.name as status_name
> FROM test AS t
> LEFT JOIN status AS s ON t.status = s.id
> )
> )r WHERE rowNum = 1;
>
> 但提示报错,不支持:
>
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: Deduplicate doesn't support
> consuming update and delete changes which is produced by node
> Join(joinType=[LeftOuterJoin], where=[(status = id0)], select=[id, name,
> time, status, proctime, id0, name0], leftInputSpec=[HasUniqueKey],
> rightInputSpec=[JoinKeyContainsUniqueKey])
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-16 Thread jindy_liu
1、试了下

在test表中增加一个proctime 

CREATE TABLE test (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
`proctime` AS PROCTIME(),
PRIMARY KEY(id) NOT ENFORCED 
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'no_lock',
  'password' = 'no_lock',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'test',
  'debezium.snapshot.locking.mode' = 'none'
);

写去重语句,

INSERT into test_status_print 
SELECT r.id, r.name, r.`time`, r.`proctime`, r.status, r.status_name
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY proctime) as 
rowNum
FROM (
SELECT t.* , s.name as status_name
FROM test AS t
LEFT JOIN status AS s ON t.status = s.id
)
)r WHERE rowNum = 1;

但提示报错,不支持:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Deduplicate doesn't support
consuming update and delete changes which is produced by node
Join(joinType=[LeftOuterJoin], where=[(status = id0)], select=[id, name,
time, status, proctime, id0, name0], leftInputSpec=[HasUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey])




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 Thread Jark Wu
1.
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
2. 这是1.12 的功能,定义在 sink DDL with 属性里的。

On Mon, 16 Nov 2020 at 14:18, jindy_liu <286729...@qq.com> wrote:

> 哦,这样啊
> 1、加上一个 deduplicate by sink key 节点在sql中是怎么写的?
> 2、另外sql 中有关键字能单独指定一条sink sql的并发度吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 Thread jindy_liu
哦,这样啊
1、加上一个 deduplicate by sink key 节点在sql中是怎么写的?
2、另外sql 中有关键字能单独指定一条sink sql的并发度吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 Thread jindy_liu
图片png格式,怕看不了,我文字补充下:
1、print的最后几行。

32> +I(191,jindy191,2020-07-03T18:04:22,0,statu0)
32> +I(192,jindy192,2020-07-03T18:04:22,0,statu0)
32> +I(193,jindy193,2020-07-03T18:04:22,0,statu0)
32> +I(194,jindy194,2020-07-03T18:04:22,0,statu0)
32> +I(195,jindy195,2020-07-03T18:04:22,0,statu0)
32> +I(196,jindy196,2020-07-03T18:04:22,0,statu0)
32> +I(197,jindy197,2020-07-03T18:04:22,0,statu0)
32> +I(198,jindy198,2020-07-03T18:04:22,0,statu0)
32> +I(199,jindy199,2020-07-03T18:04:22,0,statu0)
36> +I(0,jindy0,2020-07-06T20:01:15,3,statu3)
32> -D(0,jindy0,2020-07-06T20:01:15,0,statu0)
30> -D(1,jindy1,2020-11-12T00:00:02,1,statu1)
36> +I(1,jindy1,2020-11-12T00:00:02,3,statu3)
36> +I(2,jindy2,2020-07-03T18:04:22,3,statu3)
30> -D(2,jindy2,2020-07-03T18:04:22,2,statu2)

2、同时可以看出,大部分数据都在join的32这个subtask上做了处理。200w行处理都在一个subtask做了!

同时修下笔误:===>
snapshot后 test_status中的数据正常:
0, jindy0, 2020-07-06T20:01:15 , 0, statu0
1, jindy2, 2020-11-12T00:00:02 , 1, statu1
2, jindy2, 2020-07-03T18:04:22 , 2, statu2




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 Thread Jark Wu
如果你是改了test表上的 status 关联字段,那么是会出现这个现象的。你一开始的 example 不是改 status 字段的。

这个问题的本质是 join key 和你最终的 sink key 不一致,导致可能出现乱序。
这个只需要在 sink 前显式按照 sink key shuffle 应该就能解决,比如加上一个 deduplicate by sink key 节点。
或者在 1.12 版本中,只需要 sink 并发与前面节点的并发不一样,框架也会自动加上一个 sink key shuffle。

关于你说的 join 节点热点问题,那是因为你的 status key 太少了,导致数据倾斜严重。





On Mon, 16 Nov 2020 at 12:03, jindy_liu <286729...@qq.com> wrote:

> 怕图片看不清,
> 我文字补充下:
> 1、print的最后几行。
>
> 32> +I(191,jindy191,2020-07-03T18:04:22,0,statu0)
> 32> +I(192,jindy192,2020-07-03T18:04:22,0,statu0)
> 32> +I(193,jindy193,2020-07-03T18:04:22,0,statu0)
> 32> +I(194,jindy194,2020-07-03T18:04:22,0,statu0)
> 32> +I(195,jindy195,2020-07-03T18:04:22,0,statu0)
> 32> +I(196,jindy196,2020-07-03T18:04:22,0,statu0)
> 32> +I(197,jindy197,2020-07-03T18:04:22,0,statu0)
> 32> +I(198,jindy198,2020-07-03T18:04:22,0,statu0)
> 32> +I(199,jindy199,2020-07-03T18:04:22,0,statu0)
> 36> +I(0,jindy0,2020-07-06T20:01:15,3,statu3)
> 32> -D(0,jindy0,2020-07-06T20:01:15,0,statu0)
> 30> -D(1,jindy1,2020-11-12T00:00:02,1,statu1)
> 36> +I(1,jindy1,2020-11-12T00:00:02,3,statu3)
> 36> +I(2,jindy2,2020-07-03T18:04:22,3,statu3)
> 30> -D(2,jindy2,2020-07-03T18:04:22,2,statu2)
>
> 2、同时可以看出,大部分数据都在join的32这个subtask上做了处理。200w行处理都在一个subtask做了!
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 Thread jindy_liu
怕图片看不清,
我文字补充下:
1、print的最后几行。

32> +I(191,jindy191,2020-07-03T18:04:22,0,statu0)
32> +I(192,jindy192,2020-07-03T18:04:22,0,statu0)
32> +I(193,jindy193,2020-07-03T18:04:22,0,statu0)
32> +I(194,jindy194,2020-07-03T18:04:22,0,statu0)
32> +I(195,jindy195,2020-07-03T18:04:22,0,statu0)
32> +I(196,jindy196,2020-07-03T18:04:22,0,statu0)
32> +I(197,jindy197,2020-07-03T18:04:22,0,statu0)
32> +I(198,jindy198,2020-07-03T18:04:22,0,statu0)
32> +I(199,jindy199,2020-07-03T18:04:22,0,statu0)
36> +I(0,jindy0,2020-07-06T20:01:15,3,statu3)
32> -D(0,jindy0,2020-07-06T20:01:15,0,statu0)
30> -D(1,jindy1,2020-11-12T00:00:02,1,statu1)
36> +I(1,jindy1,2020-11-12T00:00:02,3,statu3)
36> +I(2,jindy2,2020-07-03T18:04:22,3,statu3)
30> -D(2,jindy2,2020-07-03T18:04:22,2,statu2)

2、同时可以看出,大部分数据都在join的32这个subtask上做了处理。200w行处理都在一个subtask做了!





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 Thread jindy_liu
图片是屏幕截图,png格式的。忘记加后缀了。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 Thread jindy_liu
我又重试了次,不用重启job也会有问题,就是把并行度大于1会有问题!。

1、直接在sql-client里,启动/data/home/jindyliu/flink-demo/flink-1.11.2/bin/sql-client.sh
embedded -d 
/data/home/jindyliu/flink-demo/flink-1.11.2//conf/sql-client-defaults.yaml
sql-client-defaults.yaml的并行度设置为40.

数据一样,其中test表规模是200w条,status表11条。

源表test:
CREATE TABLE test (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '1',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'test'
)
源表status
CREATE TABLE status (
`id` INT,
`name` VARCHAR(255),
PRIMARY KEY(id) NOT ENFORCED
) WITH (  
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '1',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'status'
);

//输出
CREATE TABLE test_status_print (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
`status_name` VARCHAR(255),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'print'
);

//联接
INSERT into test_status_print 
SELECT t.*, s.name
FROM test AS t
LEFT JOIN status AS s ON t.status = s.id;

复现操作,在mysql-cdc snapshot结束后,改test 表中的status字段,会出现顺序问题。我用print打印了。
snapshot后 test_status中的数据正常:
0, jindy0, 2020-07-06T20:01:15 , 0, statu0
1, jindy2, 2020-11-12T00:00:02 , 1, statu2
2, jindy2, 2020-07-03T18:04:22 , 2, statu3

snapshot后,将mysql表中记录id=0,1,2的行中的status值改为3,预期结果
0, jindy0, 2020-07-06T20:01:15 , 3, statu3
1, jindy2, 2020-11-12T00:00:02 , 3, statu3
2, jindy2, 2020-07-03T18:04:22 , 3, statu3
但输出顺序上有问题,会导致test_status表中的id=0,2两条记录丢失。

1、print输出:
 


ps:
另外观察到另外一个问题是:source数据送到join算子里,好像没啥hash能力,基本都挤在了一个结点上处理了?为啥会这样?感觉这样join算子会是瓶颈!!!很容易反压?!
 

@jark,帮忙看看,我的版本是Version: 1.11.2 Commit: fe36135 @
2020-09-09T16:19:03+02:00,官网下载的 ?







--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-14 Thread Jark Wu
抱歉... 题目没有看仔细,才发现你说的是 es sink,那和我上面说的 bug 不是一个问题。

不过从理论分析,不应该出现这个现象。
我在本地1.11分支上,用你给的数据和 sql,也没有复现你说的问题。
是不是 sql 给的不对?我看你 test_status 表的定义在 pk 之前少了一个逗号..

Best,
Jark

On Sat, 14 Nov 2020 at 17:48, Jark Wu  wrote:

> 看起来是这个 jdbc sink bug 导致的 https://issues.apache.org/jira/browse/FLINK-19423
> 这个 bug 会导致删的时候,取的 pk 索引不对,所以可能导致 index 异常,或是删错数据。
> 这个bug 会在即将发布的 1.11.3 中修复。
>
> Best,
> Jark
>
>
>
>
> On Fri, 13 Nov 2020 at 13:12, jindy_liu <286729...@qq.com> wrote:
>
>> 源表test:
>> CREATE TABLE test (
>> `id` INT,
>> `name` VARCHAR(255),
>> `time` TIMESTAMP(3),
>> `status` INT,
>> PRIMARY KEY(id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'mysql-cdc',
>>   'hostname' = 'localhost',
>>   'port' = '3306',
>>   'username' = 'root',
>>   'password' = '1',
>>   'database-name' = 'ai_audio_lyric_task',
>>   'table-name' = 'test'
>> )
>> 源表status
>> CREATE TABLE status (
>> `id` INT,
>> `name` VARCHAR(255),
>> PRIMARY KEY(id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'mysql-cdc',
>>   'hostname' = 'localhost',
>>   'port' = '3306',
>>   'username' = 'root',
>>   'password' = '1',
>>   'database-name' = 'ai_audio_lyric_task',
>>   'table-name' = 'status'
>> );
>>
>> 输出表
>> CREATE TABLE test_status (
>> `id` INT,
>> `name` VARCHAR(255),
>> `time` TIMESTAMP(3),
>> `status` INT,
>> `status_name` VARCHAR(255)
>> PRIMARY KEY(id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = 'xxx',
>>   'index' = 'xxx',
>>   'username' = 'xxx',
>>   'password' = 'xxx',
>>   'sink.bulk-flush.backoff.max-retries' = '10',
>>   'sink.bulk-flush.backoff.strategy' = 'CONSTANT',
>>   'sink.bulk-flush.max-actions' = '5000',
>>   'sink.bulk-flush.max-size' = '10mb',
>>   'sink.bulk-flush.interval' = '1s'
>> );
>>
>>
>> 输出语句:
>> INSERT into test_status
>> SELECT t.*, s.name
>> FROM test AS t
>> LEFT JOIN status AS s ON t.status = s.id;
>>
>> mysql表中已经有数据
>> test:
>> 0, name0, 2020-07-06 00:00:00 , 0
>> 1, name1, 2020-07-06 00:00:00 , 1
>> 2, name2, 2020-07-06 00:00:00 , 1
>> .
>>
>> status
>> 0, status0
>> 1, status1
>> 2, status2
>> .
>>
>> 操作顺序与复现:
>> 1、启动任务,设置并行度为40,
>> 表中数据算完后。/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink
>> savepoint保存,然后web ui上取消任务。
>>   ==> test_status中的数据正常:
>> 0, name0, 2020-07-06 00:00:00 , 0, status0
>> 1, name1, 2020-07-06 00:00:00 , 1, status1
>> 2, name2, 2020-07-06 00:00:00 , 1, status1
>>
>> 2、操作mysql, 将status中id=1数据变更为 status1_modify
>>
>> 3、接下来的重启上面的任务不同并行度下,1和大于1的情况下,在并行度大于1的情况下,结果跟预期不相符。
>> /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink  -s savepoint  -p  1
>> job  下,
>>   ==> test_status中的数据正常:
>> 0, name0, 2020-07-06 00:00:00 , 0, status0
>> 1, name1, 2020-07-06 00:00:00 , 1, status1_modify
>> 2, name2, 2020-07-06 00:00:00 , 1, status1_modify
>> /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink  -s savepoint  -p 40
>> job  下
>>   ==> test_status中的数据不正常, id = 1,2的两条数据缺失:
>> 0, name0, 2020-07-06 00:00:00 , 0, status0
>>
>>
>> 怀疑与cdc的变化流在多个并行度下sink输出流的时候,先执行了 insert id=1再执行delete id=1的动作,导致数据有问题!!!
>>
>> 这里是不是bug?还是从save point里恢复的时候,算子的状态有问题?
>> 如果是,能不能在sink的时候,只把sink这里的并行度设置为1??
>>
>>
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
>


Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-14 Thread Jark Wu
看起来是这个 jdbc sink bug 导致的 https://issues.apache.org/jira/browse/FLINK-19423
这个 bug 会导致删的时候,取的 pk 索引不对,所以可能导致 index 异常,或是删错数据。
这个bug 会在即将发布的 1.11.3 中修复。

Best,
Jark




On Fri, 13 Nov 2020 at 13:12, jindy_liu <286729...@qq.com> wrote:

> 源表test:
> CREATE TABLE test (
> `id` INT,
> `name` VARCHAR(255),
> `time` TIMESTAMP(3),
> `status` INT,
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
>   'connector' = 'mysql-cdc',
>   'hostname' = 'localhost',
>   'port' = '3306',
>   'username' = 'root',
>   'password' = '1',
>   'database-name' = 'ai_audio_lyric_task',
>   'table-name' = 'test'
> )
> 源表status
> CREATE TABLE status (
> `id` INT,
> `name` VARCHAR(255),
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
>   'connector' = 'mysql-cdc',
>   'hostname' = 'localhost',
>   'port' = '3306',
>   'username' = 'root',
>   'password' = '1',
>   'database-name' = 'ai_audio_lyric_task',
>   'table-name' = 'status'
> );
>
> 输出表
> CREATE TABLE test_status (
> `id` INT,
> `name` VARCHAR(255),
> `time` TIMESTAMP(3),
> `status` INT,
> `status_name` VARCHAR(255)
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
>   'connector' = 'elasticsearch-7',
>   'hosts' = 'xxx',
>   'index' = 'xxx',
>   'username' = 'xxx',
>   'password' = 'xxx',
>   'sink.bulk-flush.backoff.max-retries' = '10',
>   'sink.bulk-flush.backoff.strategy' = 'CONSTANT',
>   'sink.bulk-flush.max-actions' = '5000',
>   'sink.bulk-flush.max-size' = '10mb',
>   'sink.bulk-flush.interval' = '1s'
> );
>
>
> 输出语句:
> INSERT into test_status
> SELECT t.*, s.name
> FROM test AS t
> LEFT JOIN status AS s ON t.status = s.id;
>
> mysql表中已经有数据
> test:
> 0, name0, 2020-07-06 00:00:00 , 0
> 1, name1, 2020-07-06 00:00:00 , 1
> 2, name2, 2020-07-06 00:00:00 , 1
> .
>
> status
> 0, status0
> 1, status1
> 2, status2
> .
>
> 操作顺序与复现:
> 1、启动任务,设置并行度为40,
> 表中数据算完后。/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink
> savepoint保存,然后web ui上取消任务。
>   ==> test_status中的数据正常:
> 0, name0, 2020-07-06 00:00:00 , 0, status0
> 1, name1, 2020-07-06 00:00:00 , 1, status1
> 2, name2, 2020-07-06 00:00:00 , 1, status1
>
> 2、操作mysql, 将status中id=1数据变更为 status1_modify
>
> 3、接下来的重启上面的任务不同并行度下,1和大于1的情况下,在并行度大于1的情况下,结果跟预期不相符。
> /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink  -s savepoint  -p  1
> job  下,
>   ==> test_status中的数据正常:
> 0, name0, 2020-07-06 00:00:00 , 0, status0
> 1, name1, 2020-07-06 00:00:00 , 1, status1_modify
> 2, name2, 2020-07-06 00:00:00 , 1, status1_modify
> /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink  -s savepoint  -p 40
> job  下
>   ==> test_status中的数据不正常, id = 1,2的两条数据缺失:
> 0, name0, 2020-07-06 00:00:00 , 0, status0
>
>
> 怀疑与cdc的变化流在多个并行度下sink输出流的时候,先执行了 insert id=1再执行delete id=1的动作,导致数据有问题!!!
>
> 这里是不是bug?还是从save point里恢复的时候,算子的状态有问题?
> 如果是,能不能在sink的时候,只把sink这里的并行度设置为1??
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-12 Thread jindy_liu
源表test:
CREATE TABLE test (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
PRIMARY KEY(id) NOT ENFORCED 
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '1',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'test'
)
源表status
CREATE TABLE status (
`id` INT,
`name` VARCHAR(255),
PRIMARY KEY(id) NOT ENFORCED 
) WITH (  
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '1',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'status'
);

输出表
CREATE TABLE test_status (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
`status_name` VARCHAR(255)
PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'xxx',
  'index' = 'xxx',
  'username' = 'xxx',
  'password' = 'xxx',
  'sink.bulk-flush.backoff.max-retries' = '10',
  'sink.bulk-flush.backoff.strategy' = 'CONSTANT',
  'sink.bulk-flush.max-actions' = '5000',
  'sink.bulk-flush.max-size' = '10mb',
  'sink.bulk-flush.interval' = '1s'
);


输出语句:
INSERT into test_status
SELECT t.*, s.name
FROM test AS t
LEFT JOIN status AS s ON t.status = s.id;

mysql表中已经有数据
test: 
0, name0, 2020-07-06 00:00:00 , 0
1, name1, 2020-07-06 00:00:00 , 1
2, name2, 2020-07-06 00:00:00 , 1
.

status
0, status0
1, status1
2, status2
.

操作顺序与复现:
1、启动任务,设置并行度为40,
表中数据算完后。/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink
savepoint保存,然后web ui上取消任务。
  ==> test_status中的数据正常:
0, name0, 2020-07-06 00:00:00 , 0, status0
1, name1, 2020-07-06 00:00:00 , 1, status1
2, name2, 2020-07-06 00:00:00 , 1, status1

2、操作mysql, 将status中id=1数据变更为 status1_modify

3、接下来的重启上面的任务不同并行度下,1和大于1的情况下,在并行度大于1的情况下,结果跟预期不相符。
/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink  -s savepoint  -p  1
job  下,
  ==> test_status中的数据正常:
0, name0, 2020-07-06 00:00:00 , 0, status0
1, name1, 2020-07-06 00:00:00 , 1, status1_modify
2, name2, 2020-07-06 00:00:00 , 1, status1_modify
/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink  -s savepoint  -p 40
job  下
  ==> test_status中的数据不正常, id = 1,2的两条数据缺失:
0, name0, 2020-07-06 00:00:00 , 0, status0


怀疑与cdc的变化流在多个并行度下sink输出流的时候,先执行了 insert id=1再执行delete id=1的动作,导致数据有问题!!!

这里是不是bug?还是从save point里恢复的时候,算子的状态有问题?
如果是,能不能在sink的时候,只把sink这里的并行度设置为1??







--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink sql cdc 锁超时

2020-11-10 Thread 丁浩浩
当我使用flink cdc 对多张表进行关联查询时其中的一张表总是会有锁超时的情况,导致任务无法正常启动,
请问这种情况应该如何处理?
org.apache.kafka.connect.errors.ConnectException: Lock wait timeout exceeded; 
try restarting transaction Error code: 1205; SQLSTATE: 40001.
at 
io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
at 
io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
at 
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException: Lock 
wait timeout exceeded; try restarting transaction
at 
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:123)
at 
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
at 
com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at 
com.mysql.cj.jdbc.StatementImpl.executeInternal(StatementImpl.java:782)
at com.mysql.cj.jdbc.StatementImpl.execute(StatementImpl.java:666)
at 
io.debezium.jdbc.JdbcConnection.executeWithoutCommitting(JdbcConnection.java:1201)
at 
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:465)
... 3 more


Re: flink 1.11 cdc: 如何将DataStream 要如何转成flink sql cdc里的table?

2020-11-05 Thread jindy_liu
好的,谢谢jark!
数据是有删除的,所以看看要实现下souce方案。本来只想在上层用mapfuction进行一下合并来的,再做转换!
看来还是绕不过sql connector实现。源是kafka,看样子要想办法把kafka的流KafkaDynamicSource想办法改造下!!!



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11 cdc: 如何将DataStream 要如何转成flink sql cdc里的table?

2020-11-04 Thread Jark Wu
附去重文档链接:
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html#%E5%8E%BB%E9%87%8D

On Thu, 5 Nov 2020 at 12:01, Jark Wu  wrote:

> 1. 目前不支持注册一个 RowData 类型的 DataStream,因为 RowData 会被识别成 非结构化类型。
> 2. 目前不支持注册一个 cdc stream,也就是说 DataStream -> Table 只支持 insert-only
> stream,无法识别 cdc 流。这个功能规划在了1.13 版本中。
>
> 对于你的场景,有以下几种解决办法:
> 1. 如果你的流中只有插入和更新,没有删除。那么用 DataStream 先注册成一个 insert-only 的 Table,然后用
> Flink SQL 的去重语法 [1] 保留 pk 下的最后一条记录即可。
> 2. 如果你的流中有删除,那么你得自己开发一个 sql connector,把 cdc
> 抓取以及“mapfunction对流做了些顺序的限制”的逻辑实现在你的 source 中。
>
> Best,
> Jark
>
>
>
> On Thu, 5 Nov 2020 at 10:07, jindy_liu <286729...@qq.com> wrote:
>
>> 目前有两个DataStream的流,通过mapfunction,
>> 转成DataStream流,请问DataStream怎么转成table,并使用flink sql进行操作。
>> *(注:因为mapfunction对流做了些顺序的限制,目前无法无法直接用flink sql cdc直接定义表!!!)*
>>
>> *目前我的做法会报错:*
>>
>> StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv,
>> fsSettings);
>>
>> DataStreamSource json1 // canal json的格式
>> DataStreamSource json2  // canal json的格式
>> ConnectedStreams connect=
>> caliber_cdc_json.connect(caliber_snapshot_json); //connect
>> DataStream snapshot_cdc_stream = connect.flatMap(
>> new SnapshotCdcCoRichFlatMapFunction()
>> ); //做连接
>>
>> //3, 注册表,将表数据,直接输出
>> Table snapshot_cdc_table = fsTableEnv.fromDataStream(snapshot_cdc_stream);
>> fsTableEnv.createTemporaryView("test", snapshot_cdc_table);
>>
>> String output = "CREATE TABLE test_mirror (\n" +
>> "`id` INT,\n" +
>> "`name` VARCHAR(255),\n" +
>> "`time` TIMESTAMP(3),\n" +
>> "PRIMARY KEY(id) NOT ENFORCED\n" +
>> ") WITH (\n" +
>> "  'connector' = 'print'\n" +
>> ")";
>>
>>  //4, app logic
>> String op = "INSERT into test_mirror SELECT * from test";
>> fsTableEnv.executeSql(output);
>> fsTableEnv.executeSql(op);
>>
>>
>> *但提交任务失败,错误信息:*
>> serializationSchema:root
>>  |-- id: INT NOT NULL
>>  |-- name: VARCHAR(255)
>>  |-- time: TIMESTAMP(3)
>>  |-- status: INT
>>  |-- CONSTRAINT PK_3386 PRIMARY KEY (id)
>>
>> snapshot_cdc_table:UnnamedTable$0
>> ++
>> | table name |
>> ++
>> | UnnamedTable$0 |
>> |   test |
>> |test_mirror |
>> ++
>> 3 rows in set
>>
>> 
>>  The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method
>> caused an error: A raw type backed by type information has no serializable
>> string representation. It needs to be resolved into a proper raw type.
>> at
>>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>> at
>>
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>> at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>> at
>> org.apache.flink.client.cli.CliFrontend$$Lambda$58/1706292388.call(Unknown
>> Source)
>> at
>>
>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>> at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>> *Caused by: org.apache.flink.table.api.TableException: A raw type backed
>> by
>> type information has no serializable string representation. It needs to be
>> resolved into a proper raw type.*
>> at
>>
>> org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
>> at
>>
>> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
>> at
>>
>> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSi

flink 1.11 cdc: 如何将DataStream 要如何转成flink sql cdc里的table?

2020-11-04 Thread jindy_liu
目前有两个DataStream的流,通过mapfunction,
转成DataStream流,请问DataStream怎么转成table,并使用flink sql进行操作。
*(注:因为mapfunction对流做了些顺序的限制,目前无法无法直接用flink sql cdc直接定义表!!!)*

*目前我的做法会报错:*

StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv,
fsSettings);

DataStreamSource json1 // canal json的格式
DataStreamSource json2  // canal json的格式
ConnectedStreams connect=
caliber_cdc_json.connect(caliber_snapshot_json); //connect
DataStream snapshot_cdc_stream = connect.flatMap(
new SnapshotCdcCoRichFlatMapFunction()
); //做连接

//3, 注册表,将表数据,直接输出
Table snapshot_cdc_table = fsTableEnv.fromDataStream(snapshot_cdc_stream);
fsTableEnv.createTemporaryView("test", snapshot_cdc_table);

String output = "CREATE TABLE test_mirror (\n" +
"`id` INT,\n" +
"`name` VARCHAR(255),\n" +
"`time` TIMESTAMP(3),\n" +
"PRIMARY KEY(id) NOT ENFORCED\n" +
") WITH (\n" +
"  'connector' = 'print'\n" +
")";

 //4, app logic
String op = "INSERT into test_mirror SELECT * from test";
fsTableEnv.executeSql(output);
fsTableEnv.executeSql(op);


*但提交任务失败,错误信息:*
serializationSchema:root
 |-- id: INT NOT NULL
 |-- name: VARCHAR(255)
 |-- time: TIMESTAMP(3)
 |-- status: INT
 |-- CONSTRAINT PK_3386 PRIMARY KEY (id)

snapshot_cdc_table:UnnamedTable$0
++
| table name |
++
| UnnamedTable$0 |
|   test |
|test_mirror |
++
3 rows in set


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: A raw type backed by type information has no serializable
string representation. It needs to be resolved into a proper raw type.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at
org.apache.flink.client.cli.CliFrontend$$Lambda$58/1706292388.call(Unknown
Source)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
*Caused by: org.apache.flink.table.api.TableException: A raw type backed by
type information has no serializable string representation. It needs to be
resolved into a proper raw type.*
at
org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204)
at scala.Option.map(Option.scala:146)
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.

flink sql cdc任务提交报错

2020-11-02 Thread flink小猪
当我提交flink sql任务到集群上时,报以下错误。
首先感觉像是缺少依赖jar包,但是我使用反编译软件打开时发现依赖存在,那这是什么问题呢,我应该如何去解决?

Re: Re: flink mysql cdc + hive streaming疑问

2020-11-02 Thread 陈帅
谢谢各位耐心解答,问题终于找到了。

通过开启debug日志并且在PartitionTimeCommitTrigger类的commitablePartitions方法上添加了断点,运行程序发现在执行127行
LocalDateTime partTime = extractor.extract(partitionKeys,
extractPartitionValues(new Path(partition)));
代码时抛了个异常 java.time.format.DateTimeParseException: Text '20201101 17:14:00'
could not be parsed at index 8
追踪下来发现是时间格式问题导致提交分区失败。具体出问题的一行代码是
" DATE_FORMAT(TO_TIMESTAMP(create_time, '-MM-dd HH:mm:ss'), '*MMdd*')
as dt, \n" +
应该改成
" DATE_FORMAT(TO_TIMESTAMP(create_time, '-MM-dd HH:mm:ss'), '
*-MM-dd*') as dt, \n" +

翻看了flink官方文档对于参数 'partition.time-extractor.timestamp-pattern' 的说明,支持的格式是
-MM-dd HH:mm:ss,所以dt部分应该用-MM-dd表示,而不是MMdd
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/filesystem.html

再一次谢谢各位!

hdxg1101300...@163.com  于2020年11月2日周一 下午4:07写道:

> 这个utc时间怎么设置,不能查看到hive数据的根本原因是 分区信息没有更新到metastore ;
> 你会发现文件生成了但是没有 _SUCCESS文件;
> 但是这样指定也不行??
> tEnv.getConfig().setLocalTimeZone(ZoneOffset.ofHours(8));
>
> 它的增删改只是在hive中对数据做了标记;后面可以通过join来处理数据
>
>
>
> hdxg1101300...@163.com
>
> 发件人: chengyanan1...@foxmail.com
> 发送时间: 2020-11-02 13:37
> 收件人: user-zh
> 主题: Re: Re: flink mysql cdc + hive streaming疑问
> 你好!
> 看到你代码里,将增删改信息当做一个字段存到了Hive表中,那么到最后这些操作是怎么合并的呢?
>
>
>
>
> 发件人: Rui Li
> 发送时间: 2020-11-02 10:38
> 收件人: user-zh
> 抄送: Jark Wu
> 主题: Re: flink mysql cdc + hive streaming疑问
> Hi,
> 正常情况是可以自动提交分区的,我看你commit policy指定了metastore+success-file,可以检查一下分区目录下success
> file是否创建了。如果success file也没有的话说明没有触发分区提交。另外提交分区时会打印类似这样的日志,可以在log中查找一下
> LOG.info("Partition {} of table {} is ready to be committed",
> partSpec, tableIdentifier);
> LOG.info("Committed partition {} to metastore", partitionSpec);
> LOG.info("Committed partition {} with success file",
> context.partitionSpec());
> On Sun, Nov 1, 2020 at 5:36 PM 陈帅  wrote:
> > 最后,在hive shell中执行 “msck repair table team;”  命令后就能查询到写的数据了,难道flink hive
> > streaming不能自动注册hive分区吗?还是我使用的姿势不对?
> >
> > 陈帅  于2020年11月1日周日 下午5:24写道:
> >
> > > 改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容
> > > ") STORED AS TEXTFILE TBLPROPERTIES ("
> > >
> > > 这是生成的hive表建表语句
> > >
> > > hive> show create table team;
> > > OK
> > > CREATE TABLE `team`(
> > >   `team_id` int,
> > >   `team_name` string,
> > >   `create_time` string,
> > >   `update_time` string,
> > >   `op` string)
> > > PARTITIONED BY (
> > >   `dt` string,
> > >   `hr` string,
> > >   `mi` string)
> > > ROW FORMAT SERDE
> > >   'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
> > > STORED AS INPUTFORMAT
> > >   'org.apache.hadoop.mapred.TextInputFormat'
> > > OUTPUTFORMAT
> > >   'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
> > > LOCATION
> > >   'hdfs://localhost:9000/user/hive/warehouse/ods.db/team'
> > > TBLPROPERTIES (
> > >   'is_generic'='false',
> > >   'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00',
> > >   'sink.partition-commit.delay'='1 min',
> > >   'sink.partition-commit.policy.kind'='metastore,success-file',
> > >   'sink.partition-commit.trigger'='partition-time',
> > >   'transient_lastDdlTime'='160466')
> > > Time taken: 0.252 seconds, Fetched: 25 row(s)
> > >
> > > 另外,下载了hive文件内容如下
> > > 1001<0x01>Sun<0x01>2020-10-31 11:25:38<0x01>2020-10-31
> > 11:25:38<0x01>INSERT
> > >
> > > 还是查询不到结果
> > > hive> select * from team;
> > > OK
> > > Time taken: 0.326 seconds
> > >
> > > 陈帅  于2020年11月1日周日 下午5:10写道:
> > >
> > >>
> > >>
> >
> 之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。
> > >> 生成的hive分区文件路径类似于
> > /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/
> > >> part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3
> > >>
> > >> 陈帅  于2020年11月1日周日 下午4:43写道:
> > >>
> > >>>
> >
> 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive
> > >>> shell查不到数据。
> > >>>
> > >>> import com.alibaba.fastjson.JSON;
> > >>> import com.alibaba.fastjson.JSONObject;
> > >>> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> > >>> import org.apache.flink.api.common.typeinfo.TypeInformation;
> > >>> import org.apache.flink.api.common.typeinfo.Types;
> > >>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
>

Re: Re: flink mysql cdc + hive streaming疑问

2020-11-02 Thread hdxg1101300...@163.com
这个utc时间怎么设置,不能查看到hive数据的根本原因是 分区信息没有更新到metastore ;
你会发现文件生成了但是没有 _SUCCESS文件;
但是这样指定也不行??
tEnv.getConfig().setLocalTimeZone(ZoneOffset.ofHours(8));

它的增删改只是在hive中对数据做了标记;后面可以通过join来处理数据



hdxg1101300...@163.com
 
发件人: chengyanan1...@foxmail.com
发送时间: 2020-11-02 13:37
收件人: user-zh
主题: Re: Re: flink mysql cdc + hive streaming疑问
你好!
看到你代码里,将增删改信息当做一个字段存到了Hive表中,那么到最后这些操作是怎么合并的呢?
 
 
 
 
发件人: Rui Li
发送时间: 2020-11-02 10:38
收件人: user-zh
抄送: Jark Wu
主题: Re: flink mysql cdc + hive streaming疑问
Hi,
正常情况是可以自动提交分区的,我看你commit policy指定了metastore+success-file,可以检查一下分区目录下success
file是否创建了。如果success file也没有的话说明没有触发分区提交。另外提交分区时会打印类似这样的日志,可以在log中查找一下
LOG.info("Partition {} of table {} is ready to be committed",
partSpec, tableIdentifier);
LOG.info("Committed partition {} to metastore", partitionSpec);
LOG.info("Committed partition {} with success file", context.partitionSpec());
On Sun, Nov 1, 2020 at 5:36 PM 陈帅  wrote:
> 最后,在hive shell中执行 “msck repair table team;”  命令后就能查询到写的数据了,难道flink hive
> streaming不能自动注册hive分区吗?还是我使用的姿势不对?
>
> 陈帅  于2020年11月1日周日 下午5:24写道:
>
> > 改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容
> > ") STORED AS TEXTFILE TBLPROPERTIES ("
> >
> > 这是生成的hive表建表语句
> >
> > hive> show create table team;
> > OK
> > CREATE TABLE `team`(
> >   `team_id` int,
> >   `team_name` string,
> >   `create_time` string,
> >   `update_time` string,
> >   `op` string)
> > PARTITIONED BY (
> >   `dt` string,
> >   `hr` string,
> >   `mi` string)
> > ROW FORMAT SERDE
> >   'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
> > STORED AS INPUTFORMAT
> >   'org.apache.hadoop.mapred.TextInputFormat'
> > OUTPUTFORMAT
> >   'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
> > LOCATION
> >   'hdfs://localhost:9000/user/hive/warehouse/ods.db/team'
> > TBLPROPERTIES (
> >   'is_generic'='false',
> >   'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00',
> >   'sink.partition-commit.delay'='1 min',
> >   'sink.partition-commit.policy.kind'='metastore,success-file',
> >   'sink.partition-commit.trigger'='partition-time',
> >   'transient_lastDdlTime'='160466')
> > Time taken: 0.252 seconds, Fetched: 25 row(s)
> >
> > 另外,下载了hive文件内容如下
> > 1001<0x01>Sun<0x01>2020-10-31 11:25:38<0x01>2020-10-31
> 11:25:38<0x01>INSERT
> >
> > 还是查询不到结果
> > hive> select * from team;
> > OK
> > Time taken: 0.326 seconds
> >
> > 陈帅  于2020年11月1日周日 下午5:10写道:
> >
> >>
> >>
> 之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。
> >> 生成的hive分区文件路径类似于
> /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/
> >> part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3
> >>
> >> 陈帅  于2020年11月1日周日 下午4:43写道:
> >>
> >>>
> 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive
> >>> shell查不到数据。
> >>>
> >>> import com.alibaba.fastjson.JSON;
> >>> import com.alibaba.fastjson.JSONObject;
> >>> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> >>> import org.apache.flink.api.common.typeinfo.TypeInformation;
> >>> import org.apache.flink.api.common.typeinfo.Types;
> >>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> >>> import org.apache.flink.streaming.api.CheckpointingMode;
> >>> import org.apache.flink.streaming.api.TimeCharacteristic;
> >>> import org.apache.flink.streaming.api.datastream.DataStream;
> >>> import
> >>>
> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
> >>> import
> >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> >>> import
> >>>
> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
> >>> import org.apache.flink.streaming.api.windowing.time.Time;
> >>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> >>> import
> >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
> >>> import org.apache.flink.table.api.EnvironmentSettings;
> >>> import org.apache.flink.table.api.SqlDialect;
> >>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> >>> import org.apache.flink.table.catalog.hive.HiveCatalog;
> >>> import org.apache.flink.types.Row;
> >>> import org.apache.flink.types.RowKind;
> >>>
> >>> i

Re: Re: flink mysql cdc + hive streaming疑问

2020-11-01 Thread chengyanan1...@foxmail.com
你好!
看到你代码里,将增删改信息当做一个字段存到了Hive表中,那么到最后这些操作是怎么合并的呢?




 
发件人: Rui Li
发送时间: 2020-11-02 10:38
收件人: user-zh
抄送: Jark Wu
主题: Re: flink mysql cdc + hive streaming疑问
Hi,
 
正常情况是可以自动提交分区的,我看你commit policy指定了metastore+success-file,可以检查一下分区目录下success
file是否创建了。如果success file也没有的话说明没有触发分区提交。另外提交分区时会打印类似这样的日志,可以在log中查找一下
 
LOG.info("Partition {} of table {} is ready to be committed",
partSpec, tableIdentifier);
 
LOG.info("Committed partition {} to metastore", partitionSpec);
 
LOG.info("Committed partition {} with success file", context.partitionSpec());
 
 
On Sun, Nov 1, 2020 at 5:36 PM 陈帅  wrote:
 
> 最后,在hive shell中执行 “msck repair table team;”  命令后就能查询到写的数据了,难道flink hive
> streaming不能自动注册hive分区吗?还是我使用的姿势不对?
>
> 陈帅  于2020年11月1日周日 下午5:24写道:
>
> > 改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容
> > ") STORED AS TEXTFILE TBLPROPERTIES ("
> >
> > 这是生成的hive表建表语句
> >
> > hive> show create table team;
> > OK
> > CREATE TABLE `team`(
> >   `team_id` int,
> >   `team_name` string,
> >   `create_time` string,
> >   `update_time` string,
> >   `op` string)
> > PARTITIONED BY (
> >   `dt` string,
> >   `hr` string,
> >   `mi` string)
> > ROW FORMAT SERDE
> >   'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
> > STORED AS INPUTFORMAT
> >   'org.apache.hadoop.mapred.TextInputFormat'
> > OUTPUTFORMAT
> >   'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
> > LOCATION
> >   'hdfs://localhost:9000/user/hive/warehouse/ods.db/team'
> > TBLPROPERTIES (
> >   'is_generic'='false',
> >   'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00',
> >   'sink.partition-commit.delay'='1 min',
> >   'sink.partition-commit.policy.kind'='metastore,success-file',
> >   'sink.partition-commit.trigger'='partition-time',
> >   'transient_lastDdlTime'='160466')
> > Time taken: 0.252 seconds, Fetched: 25 row(s)
> >
> > 另外,下载了hive文件内容如下
> > 1001<0x01>Sun<0x01>2020-10-31 11:25:38<0x01>2020-10-31
> 11:25:38<0x01>INSERT
> >
> > 还是查询不到结果
> > hive> select * from team;
> > OK
> > Time taken: 0.326 seconds
> >
> > 陈帅  于2020年11月1日周日 下午5:10写道:
> >
> >>
> >>
> 之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。
> >> 生成的hive分区文件路径类似于
> /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/
> >> part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3
> >>
> >> 陈帅  于2020年11月1日周日 下午4:43写道:
> >>
> >>>
> 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive
> >>> shell查不到数据。
> >>>
> >>> import com.alibaba.fastjson.JSON;
> >>> import com.alibaba.fastjson.JSONObject;
> >>> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> >>> import org.apache.flink.api.common.typeinfo.TypeInformation;
> >>> import org.apache.flink.api.common.typeinfo.Types;
> >>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> >>> import org.apache.flink.streaming.api.CheckpointingMode;
> >>> import org.apache.flink.streaming.api.TimeCharacteristic;
> >>> import org.apache.flink.streaming.api.datastream.DataStream;
> >>> import
> >>>
> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
> >>> import
> >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> >>> import
> >>>
> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
> >>> import org.apache.flink.streaming.api.windowing.time.Time;
> >>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> >>> import
> >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
> >>> import org.apache.flink.table.api.EnvironmentSettings;
> >>> import org.apache.flink.table.api.SqlDialect;
> >>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> >>> import org.apache.flink.table.catalog.hive.HiveCatalog;
> >>> import org.apache.flink.types.Row;
> >>> import org.apache.flink.types.RowKind;
> >>>
> >>> import java.time.Duration;
> >>> import java.time.Instant;
> >>> import java.time.LocalDateTime;
> >>> import java.time.ZoneId;
> >>> import java.time.format.DateTimeFormatter;
> >>> import java.util.Properties;
> >>>
> >>> public class MysqlCDC

Re: flink mysql cdc + hive streaming疑问

2020-11-01 Thread Jingsong Li
- 你可以用 proc-time
- 或者在你的Source上添加 **UTC时区的Watermark**,注意是 **UTC**,SQL的watermark都是 **UTC**的

On Mon, Nov 2, 2020 at 10:38 AM Rui Li  wrote:

> Hi,
>
> 正常情况是可以自动提交分区的,我看你commit policy指定了metastore+success-file,可以检查一下分区目录下success
> file是否创建了。如果success file也没有的话说明没有触发分区提交。另外提交分区时会打印类似这样的日志,可以在log中查找一下
>
> LOG.info("Partition {} of table {} is ready to be committed",
> partSpec, tableIdentifier);
>
> LOG.info("Committed partition {} to metastore", partitionSpec);
>
> LOG.info("Committed partition {} with success file",
> context.partitionSpec());
>
>
> On Sun, Nov 1, 2020 at 5:36 PM 陈帅  wrote:
>
> > 最后,在hive shell中执行 “msck repair table team;”  命令后就能查询到写的数据了,难道flink hive
> > streaming不能自动注册hive分区吗?还是我使用的姿势不对?
> >
> > 陈帅  于2020年11月1日周日 下午5:24写道:
> >
> > > 改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容
> > > ") STORED AS TEXTFILE TBLPROPERTIES ("
> > >
> > > 这是生成的hive表建表语句
> > >
> > > hive> show create table team;
> > > OK
> > > CREATE TABLE `team`(
> > >   `team_id` int,
> > >   `team_name` string,
> > >   `create_time` string,
> > >   `update_time` string,
> > >   `op` string)
> > > PARTITIONED BY (
> > >   `dt` string,
> > >   `hr` string,
> > >   `mi` string)
> > > ROW FORMAT SERDE
> > >   'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
> > > STORED AS INPUTFORMAT
> > >   'org.apache.hadoop.mapred.TextInputFormat'
> > > OUTPUTFORMAT
> > >   'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
> > > LOCATION
> > >   'hdfs://localhost:9000/user/hive/warehouse/ods.db/team'
> > > TBLPROPERTIES (
> > >   'is_generic'='false',
> > >   'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00',
> > >   'sink.partition-commit.delay'='1 min',
> > >   'sink.partition-commit.policy.kind'='metastore,success-file',
> > >   'sink.partition-commit.trigger'='partition-time',
> > >   'transient_lastDdlTime'='160466')
> > > Time taken: 0.252 seconds, Fetched: 25 row(s)
> > >
> > > 另外,下载了hive文件内容如下
> > > 1001<0x01>Sun<0x01>2020-10-31 11:25:38<0x01>2020-10-31
> > 11:25:38<0x01>INSERT
> > >
> > > 还是查询不到结果
> > > hive> select * from team;
> > > OK
> > > Time taken: 0.326 seconds
> > >
> > > 陈帅  于2020年11月1日周日 下午5:10写道:
> > >
> > >>
> > >>
> >
> 之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。
> > >> 生成的hive分区文件路径类似于
> > /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/
> > >> part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3
> > >>
> > >> 陈帅  于2020年11月1日周日 下午4:43写道:
> > >>
> > >>>
> >
> 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive
> > >>> shell查不到数据。
> > >>>
> > >>> import com.alibaba.fastjson.JSON;
> > >>> import com.alibaba.fastjson.JSONObject;
> > >>> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> > >>> import org.apache.flink.api.common.typeinfo.TypeInformation;
> > >>> import org.apache.flink.api.common.typeinfo.Types;
> > >>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> > >>> import org.apache.flink.streaming.api.CheckpointingMode;
> > >>> import org.apache.flink.streaming.api.TimeCharacteristic;
> > >>> import org.apache.flink.streaming.api.datastream.DataStream;
> > >>> import
> > >>>
> > org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
> > >>> import
> > >>>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > >>> import
> > >>>
> >
> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
> > >>> import org.apache.flink.streaming.api.windowing.time.Time;
> > >>> import
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> > >>> import
> > >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
> > >>> import org.apache.flink.table.api.EnvironmentSettings;
> > >>> import org.apache.flink.table.api.SqlDialect;
> > >>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> > >>> import org.apache.flink.table.catalog.hive.HiveCatalog;
> > >>> import org.apache.flink.types.Row;
> > >>> import org.apache.flink.types.RowKind;
> > >>>
> > >>> import java.time.Duration;
> > >>> import java.time.Instant;
> > >>> import java.time.LocalDateTime;
> > >>> import java.time.ZoneId;
> > >>> import java.time.format.DateTimeFormatter;
> > >>> import java.util.Properties;
> > >>>
> > >>> public class MysqlCDC2Hive {
> > >>>
> > >>> private static final DateTimeFormatter dtf =
> > >>> DateTimeFormatter.ofPattern("-MM-dd HH:mm:ss");
> > >>>
> > >>> public static void main(String[] args) throws Exception {
> > >>> StreamExecutionEnvironment streamEnv =
> > >>> StreamExecutionEnvironment.getExecutionEnvironment();
> > >>>
> > >>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> > >>> streamEnv.setParallelism(3);
> > >>> streamEnv.enableCheckpointing(6);
> > >>>
> > >>> EnvironmentSettings tableEnvSettings =
> > >>> EnvironmentSettings.newInstance()
> > >>> .useBlinkPlanner()
> > >>> 

回复: flink mysql cdc + hive streaming疑问

2020-11-01 Thread Zhang Yuxiao
您好,

您设置的分区提交策略是既写入hive的metastore,又会在分区目录中生成_SUCCESS文件

'sink.partition-commit.policy.kind' = 'metastore,success-file',

可以看看分区目录中的_SUCCESS文件有没有生成,没有的话hive那边也是由于分区的元数据没有提交导致的。

分区元数据提交延迟好像是跟Checkpoint的周期以及 'sink.partition-commit.delay' 
配置有关,可以尝试等待两者相加的时间在看看hive是否可以查询。

祝好,



发件人: 陈帅 
发送时间: 2020年11月1日 下午 05:36
收件人: Jark Wu 
抄送: user-zh 
主题: Re: flink mysql cdc + hive streaming疑问

最后,在hive shell中执行 “msck repair table team;”  命令后就能查询到写的数据了,难道flink hive
streaming不能自动注册hive分区吗?还是我使用的姿势不对?

陈帅  于2020年11月1日周日 下午5:24写道:

> 改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容
> ") STORED AS TEXTFILE TBLPROPERTIES ("
>
> 这是生成的hive表建表语句
>
> hive> show create table team;
> OK
> CREATE TABLE `team`(
>   `team_id` int,
>   `team_name` string,
>   `create_time` string,
>   `update_time` string,
>   `op` string)
> PARTITIONED BY (
>   `dt` string,
>   `hr` string,
>   `mi` string)
> ROW FORMAT SERDE
>   'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
> STORED AS INPUTFORMAT
>   'org.apache.hadoop.mapred.TextInputFormat'
> OUTPUTFORMAT
>   'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
> LOCATION
>   'hdfs://localhost:9000/user/hive/warehouse/ods.db/team'
> TBLPROPERTIES (
>   'is_generic'='false',
>   'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00',
>   'sink.partition-commit.delay'='1 min',
>   'sink.partition-commit.policy.kind'='metastore,success-file',
>   'sink.partition-commit.trigger'='partition-time',
>   'transient_lastDdlTime'='160466')
> Time taken: 0.252 seconds, Fetched: 25 row(s)
>
> 另外,下载了hive文件内容如下
> 1001<0x01>Sun<0x01>2020-10-31 11:25:38<0x01>2020-10-31 11:25:38<0x01>INSERT
>
> 还是查询不到结果
> hive> select * from team;
> OK
> Time taken: 0.326 seconds
>
> 陈帅  于2020年11月1日周日 下午5:10写道:
>
>>
>> 之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。
>> 生成的hive分区文件路径类似于 /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/
>> part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3
>>
>> 陈帅  于2020年11月1日周日 下午4:43写道:
>>
>>> 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive
>>> shell查不到数据。
>>>
>>> import com.alibaba.fastjson.JSON;
>>> import com.alibaba.fastjson.JSONObject;
>>> import org.apache.flink.api.common.serialization.SimpleStringSchema;
>>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>>> import org.apache.flink.api.common.typeinfo.Types;
>>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
>>> import org.apache.flink.streaming.api.CheckpointingMode;
>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>> import
>>> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
>>> import
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import
>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
>>> import
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
>>> import org.apache.flink.table.api.EnvironmentSettings;
>>> import org.apache.flink.table.api.SqlDialect;
>>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>>> import org.apache.flink.table.catalog.hive.HiveCatalog;
>>> import org.apache.flink.types.Row;
>>> import org.apache.flink.types.RowKind;
>>>
>>> import java.time.Duration;
>>> import java.time.Instant;
>>> import java.time.LocalDateTime;
>>> import java.time.ZoneId;
>>> import java.time.format.DateTimeFormatter;
>>> import java.util.Properties;
>>>
>>> public class MysqlCDC2Hive {
>>>
>>> private static final DateTimeFormatter dtf =
>>> DateTimeFormatter.ofPattern("-MM-dd HH:mm:ss");
>>>
>>> public static void main(String[] args) throws Exception {
>>> StreamExecutionEnvironment streamEnv =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> streamEnv.setParallelism(3);
>>> streamEnv.enableCheckpointing(6);
>>>
>>> EnvironmentSettings tableEnvSettings =
>>> EnvironmentSettings.newInstance()
>>> .useBl

Re: flink mysql cdc + hive streaming疑问

2020-11-01 Thread 陈帅
我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive
shell查不到数据。

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

public class MysqlCDC2Hive {

private static final DateTimeFormatter dtf =
DateTimeFormatter.ofPattern("-MM-dd HH:mm:ss");

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
streamEnv.setParallelism(3);
streamEnv.enableCheckpointing(6);

EnvironmentSettings tableEnvSettings =
EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(streamEnv, tableEnvSettings);

tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
CheckpointingMode.EXACTLY_ONCE);

tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofMinutes(1));

String catalogName = "hive_catalog";
HiveCatalog catalog = new HiveCatalog(
catalogName,
"default",
"/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf",
"2.3.4"
);
tableEnv.registerCatalog(catalogName, catalog);
tableEnv.useCatalog(catalogName);

MyDateFormat2 myDateFormat = new MyDateFormat2();
tableEnv.registerFunction("my_date_format", myDateFormat);

tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc");
tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team");
tableEnv.executeSql("CREATE TABLE cdc.team(\n" +
"team_id INT,\n" +
"team_name STRING,\n" +
"create_time TIMESTAMP,\n" +
"update_time TIMESTAMP,\n" +
"proctime as proctime()\n" +
") WITH (\n" +
"  'connector' = 'mysql-cdc',\n" +
"  'hostname' = 'localhost',\n" +
"  'port' = '3306',\n" +
"  'username' = 'root',\n" +
"  'password' = 'root',\n" +
"  'database-name' = 'test',\n" +
"  'table-name' = 'team'\n" +
")");

tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka");
tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team");
tableEnv.executeSql("CREATE TABLE kafka.team (\n" +
"  team_id INT,\n" +
"  team_name STRING,\n" +
"  create_time TIMESTAMP,\n" +
"  update_time TIMESTAMP\n" +
") WITH (\n" +
"  'connector' = 'kafka',\n" +
"  'topic' = 'team',\n" +
"  'scan.startup.mode' = 'earliest-offset',\n" +
"  'properties.bootstrap.servers' = 'localhost:9092',\n" +
"  'format' = 'changelog-json'\n" +
")");

tableEnv.executeSql("INSERT INTO kafka.team \n" +
"SELECT team_id, team_name, create_time, update_time \n" +
"FROM cdc.team");

// 定义带op字段的stream
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test1`");

FlinkKafkaConsumerBase consumer = new FlinkKafkaConsumer<>(
"team",
new SimpleStringSchema(),
  

Re: flink mysql cdc + hive streaming疑问

2020-11-01 Thread 陈帅
最后,在hive shell中执行 “msck repair table team;”  命令后就能查询到写的数据了,难道flink hive
streaming不能自动注册hive分区吗?还是我使用的姿势不对?

陈帅  于2020年11月1日周日 下午5:24写道:

> 改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容
> ") STORED AS TEXTFILE TBLPROPERTIES ("
>
> 这是生成的hive表建表语句
>
> hive> show create table team;
> OK
> CREATE TABLE `team`(
>   `team_id` int,
>   `team_name` string,
>   `create_time` string,
>   `update_time` string,
>   `op` string)
> PARTITIONED BY (
>   `dt` string,
>   `hr` string,
>   `mi` string)
> ROW FORMAT SERDE
>   'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
> STORED AS INPUTFORMAT
>   'org.apache.hadoop.mapred.TextInputFormat'
> OUTPUTFORMAT
>   'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
> LOCATION
>   'hdfs://localhost:9000/user/hive/warehouse/ods.db/team'
> TBLPROPERTIES (
>   'is_generic'='false',
>   'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00',
>   'sink.partition-commit.delay'='1 min',
>   'sink.partition-commit.policy.kind'='metastore,success-file',
>   'sink.partition-commit.trigger'='partition-time',
>   'transient_lastDdlTime'='160466')
> Time taken: 0.252 seconds, Fetched: 25 row(s)
>
> 另外,下载了hive文件内容如下
> 1001<0x01>Sun<0x01>2020-10-31 11:25:38<0x01>2020-10-31 11:25:38<0x01>INSERT
>
> 还是查询不到结果
> hive> select * from team;
> OK
> Time taken: 0.326 seconds
>
> 陈帅  于2020年11月1日周日 下午5:10写道:
>
>>
>> 之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。
>> 生成的hive分区文件路径类似于 /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/
>> part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3
>>
>> 陈帅  于2020年11月1日周日 下午4:43写道:
>>
>>> 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive
>>> shell查不到数据。
>>>
>>> import com.alibaba.fastjson.JSON;
>>> import com.alibaba.fastjson.JSONObject;
>>> import org.apache.flink.api.common.serialization.SimpleStringSchema;
>>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>>> import org.apache.flink.api.common.typeinfo.Types;
>>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
>>> import org.apache.flink.streaming.api.CheckpointingMode;
>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>> import
>>> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
>>> import
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import
>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
>>> import
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
>>> import org.apache.flink.table.api.EnvironmentSettings;
>>> import org.apache.flink.table.api.SqlDialect;
>>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>>> import org.apache.flink.table.catalog.hive.HiveCatalog;
>>> import org.apache.flink.types.Row;
>>> import org.apache.flink.types.RowKind;
>>>
>>> import java.time.Duration;
>>> import java.time.Instant;
>>> import java.time.LocalDateTime;
>>> import java.time.ZoneId;
>>> import java.time.format.DateTimeFormatter;
>>> import java.util.Properties;
>>>
>>> public class MysqlCDC2Hive {
>>>
>>> private static final DateTimeFormatter dtf =
>>> DateTimeFormatter.ofPattern("-MM-dd HH:mm:ss");
>>>
>>> public static void main(String[] args) throws Exception {
>>> StreamExecutionEnvironment streamEnv =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> streamEnv.setParallelism(3);
>>> streamEnv.enableCheckpointing(6);
>>>
>>> EnvironmentSettings tableEnvSettings =
>>> EnvironmentSettings.newInstance()
>>> .useBlinkPlanner()
>>> .inStreamingMode()
>>> .build();
>>> StreamTableEnvironment tableEnv =
>>> StreamTableEnvironment.create(streamEnv, tableEnvSettings);
>>>
>>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
>>> CheckpointingMode.EXACTLY_ONCE);
>>>
>>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
>>> Duration.ofMinutes(1));
>>>
>>> String catalogName = "hive_catalog";
>>> HiveCatalog catalog = new HiveCatalog(
>>> catalogName,
>>> "default",
>>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf",
>>> "2.3.4"
>>> );
>>> tableEnv.registerCatalog(catalogName, catalog);
>>> tableEnv.useCatalog(catalogName);
>>>
>>> MyDateFormat2 myDateFormat = new MyDateFormat2();
>>> tableEnv.registerFunction("my_date_format", myDateFormat);
>>>
>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc");
>>> tableEnv.executeSql("DROP TABLE 

Re: flink mysql cdc + hive streaming疑问

2020-11-01 Thread 陈帅
改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容
") STORED AS TEXTFILE TBLPROPERTIES ("

这是生成的hive表建表语句

hive> show create table team;
OK
CREATE TABLE `team`(
  `team_id` int,
  `team_name` string,
  `create_time` string,
  `update_time` string,
  `op` string)
PARTITIONED BY (
  `dt` string,
  `hr` string,
  `mi` string)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
LOCATION
  'hdfs://localhost:9000/user/hive/warehouse/ods.db/team'
TBLPROPERTIES (
  'is_generic'='false',
  'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00',
  'sink.partition-commit.delay'='1 min',
  'sink.partition-commit.policy.kind'='metastore,success-file',
  'sink.partition-commit.trigger'='partition-time',
  'transient_lastDdlTime'='160466')
Time taken: 0.252 seconds, Fetched: 25 row(s)

另外,下载了hive文件内容如下
1001<0x01>Sun<0x01>2020-10-31 11:25:38<0x01>2020-10-31 11:25:38<0x01>INSERT

还是查询不到结果
hive> select * from team;
OK
Time taken: 0.326 seconds

陈帅  于2020年11月1日周日 下午5:10写道:

>
> 之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。
> 生成的hive分区文件路径类似于 /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/
> part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3
>
> 陈帅  于2020年11月1日周日 下午4:43写道:
>
>> 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive
>> shell查不到数据。
>>
>> import com.alibaba.fastjson.JSON;
>> import com.alibaba.fastjson.JSONObject;
>> import org.apache.flink.api.common.serialization.SimpleStringSchema;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.common.typeinfo.Types;
>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
>> import org.apache.flink.streaming.api.CheckpointingMode;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import
>> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import
>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>> import org.apache.flink.streaming.api.windowing.time.Time;
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
>> import org.apache.flink.table.api.EnvironmentSettings;
>> import org.apache.flink.table.api.SqlDialect;
>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>> import org.apache.flink.table.catalog.hive.HiveCatalog;
>> import org.apache.flink.types.Row;
>> import org.apache.flink.types.RowKind;
>>
>> import java.time.Duration;
>> import java.time.Instant;
>> import java.time.LocalDateTime;
>> import java.time.ZoneId;
>> import java.time.format.DateTimeFormatter;
>> import java.util.Properties;
>>
>> public class MysqlCDC2Hive {
>>
>> private static final DateTimeFormatter dtf =
>> DateTimeFormatter.ofPattern("-MM-dd HH:mm:ss");
>>
>> public static void main(String[] args) throws Exception {
>> StreamExecutionEnvironment streamEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> streamEnv.setParallelism(3);
>> streamEnv.enableCheckpointing(6);
>>
>> EnvironmentSettings tableEnvSettings =
>> EnvironmentSettings.newInstance()
>> .useBlinkPlanner()
>> .inStreamingMode()
>> .build();
>> StreamTableEnvironment tableEnv =
>> StreamTableEnvironment.create(streamEnv, tableEnvSettings);
>>
>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
>> CheckpointingMode.EXACTLY_ONCE);
>>
>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
>> Duration.ofMinutes(1));
>>
>> String catalogName = "hive_catalog";
>> HiveCatalog catalog = new HiveCatalog(
>> catalogName,
>> "default",
>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf",
>> "2.3.4"
>> );
>> tableEnv.registerCatalog(catalogName, catalog);
>> tableEnv.useCatalog(catalogName);
>>
>> MyDateFormat2 myDateFormat = new MyDateFormat2();
>> tableEnv.registerFunction("my_date_format", myDateFormat);
>>
>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc");
>> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team");
>> tableEnv.executeSql("CREATE TABLE cdc.team(\n" +
>> "team_id INT,\n" +
>> "team_name STRING,\n" +
>> "create_time TIMESTAMP,\n" +
>> "update_time TIMESTAMP,\n" +
>> "proctime 

Re: flink mysql cdc + hive streaming疑问

2020-11-01 Thread 陈帅
之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。
生成的hive分区文件路径类似于 /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/
part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3

陈帅  于2020年11月1日周日 下午4:43写道:

> 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive
> shell查不到数据。
>
> import com.alibaba.fastjson.JSON;
> import com.alibaba.fastjson.JSONObject;
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> import org.apache.flink.streaming.api.CheckpointingMode;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.SqlDialect;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.catalog.hive.HiveCatalog;
> import org.apache.flink.types.Row;
> import org.apache.flink.types.RowKind;
>
> import java.time.Duration;
> import java.time.Instant;
> import java.time.LocalDateTime;
> import java.time.ZoneId;
> import java.time.format.DateTimeFormatter;
> import java.util.Properties;
>
> public class MysqlCDC2Hive {
>
> private static final DateTimeFormatter dtf =
> DateTimeFormatter.ofPattern("-MM-dd HH:mm:ss");
>
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> streamEnv.setParallelism(3);
> streamEnv.enableCheckpointing(6);
>
> EnvironmentSettings tableEnvSettings =
> EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(streamEnv, tableEnvSettings);
>
> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
> CheckpointingMode.EXACTLY_ONCE);
>
> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
> Duration.ofMinutes(1));
>
> String catalogName = "hive_catalog";
> HiveCatalog catalog = new HiveCatalog(
> catalogName,
> "default",
> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf",
> "2.3.4"
> );
> tableEnv.registerCatalog(catalogName, catalog);
> tableEnv.useCatalog(catalogName);
>
> MyDateFormat2 myDateFormat = new MyDateFormat2();
> tableEnv.registerFunction("my_date_format", myDateFormat);
>
> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc");
> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team");
> tableEnv.executeSql("CREATE TABLE cdc.team(\n" +
> "team_id INT,\n" +
> "team_name STRING,\n" +
> "create_time TIMESTAMP,\n" +
> "update_time TIMESTAMP,\n" +
> "proctime as proctime()\n" +
> ") WITH (\n" +
> "  'connector' = 'mysql-cdc',\n" +
> "  'hostname' = 'localhost',\n" +
> "  'port' = '3306',\n" +
> "  'username' = 'root',\n" +
> "  'password' = 'root',\n" +
> "  'database-name' = 'test',\n" +
> "  'table-name' = 'team'\n" +
> ")");
>
> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka");
> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team");
> tableEnv.executeSql("CREATE TABLE kafka.team (\n" +
> "  team_id INT,\n" +
> "  team_name STRING,\n" +
> "  create_time TIMESTAMP,\n" +
> "  update_time TIMESTAMP\n" +
> ") WITH (\n" +
> "  'connector' = 'kafka',\n" +
> "  'topic' = 'team',\n" +
> "  'scan.startup.mode' = 'earliest-offset',\n" +
> "  'properties.bootstrap.servers' = 'localhost:9092',\n" +
> "  'format' = 'changelog-json'\n" +
> ")");
>
> tableEnv.executeSql("INSERT INTO kafka.team \n" +
>

Re: flink mysql cdc + hive streaming疑问

2020-10-31 Thread Jark Wu
你检查一下 hive 文件是否正常生成了?

我看你上面的代码,kafka->hive 流程中是没有 watermark 的,而"partition-time" 的 trigger  policy
是基于 watermark 驱动的,所以可能是这个原因导致 hive 中没有数据。

Best,
Jark


[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#sink-partition-commit-trigger

On Sat, 31 Oct 2020 at 17:25, 陈帅  wrote:

> 谢谢Jark细致解答,我按照你给的思路试了下。遇到一个问题是,在不开hive分区的情况下写入和读取是没有问题的,但在开启hive表时间分区后,写入是成功了,然而通过hive
> shell查不到数据,表结构是正确的。(代码我注释掉了) 能帮忙看下是哪里写得不对吗?
>
> cdc -> kafka示例消息如下
> {"data":{"team_id":1001,"team_name":"Sun","create_time":"2020-10-31
> 11:25:38","update_time":"2020-10-31 11:25:38"},"op":"+I"}
>
> import com.alibaba.fastjson.JSON;
> import com.alibaba.fastjson.JSONObject;
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> import org.apache.flink.streaming.api.CheckpointingMode;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.SqlDialect;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.catalog.hive.HiveCatalog;
> import org.apache.flink.types.Row;
> import org.apache.flink.types.RowKind;
>
> import java.time.Duration;
> import java.util.Properties;
>
> public class MysqlCDC2Hive {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> streamEnv.setParallelism(3);
> streamEnv.enableCheckpointing(6);
>
> EnvironmentSettings tableEnvSettings =
> EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(streamEnv, tableEnvSettings);
>
> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
> CheckpointingMode.EXACTLY_ONCE);
>
> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
> Duration.ofMinutes(1));
>
> String catalogName = "hive_catalog";
> HiveCatalog catalog = new HiveCatalog(
> catalogName,
> "default",
> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf",
> "2.3.4"
> );
> tableEnv.registerCatalog(catalogName, catalog);
> tableEnv.useCatalog(catalogName);
>
> MyDateFormat2 myDateFormat = new MyDateFormat2();
> tableEnv.registerFunction("my_date_format", myDateFormat);
>
> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc");
> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team");
> tableEnv.executeSql("CREATE TABLE cdc.team(\n" +
> "team_id INT,\n" +
> "team_name STRING,\n" +
> "create_time TIMESTAMP,\n" +
> "update_time TIMESTAMP,\n" +
> "proctime as proctime()\n" +
> ") WITH (\n" +
> "  'connector' = 'mysql-cdc',\n" +
> "  'hostname' = 'localhost',\n" +
> "  'port' = '3306',\n" +
> "  'username' = 'root',\n" +
> "  'password' = 'root',\n" +
> "  'database-name' = 'test',\n" +
> "  'table-name' = 'team'\n" +
> ")");
>
> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka");
> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team");
> tableEnv.executeSql("CREATE TABLE kafka.team (\n" +
> "  team_id INT,\n" +
> "  team_name STRING,\n" +
> "  create_time TIMESTAMP,\n" +
> "  update_time TIMESTAMP\n" +
> ") WITH (\n" +
> "  'connector' = 'kafka',\n" +
> "  'topic' = 'team',\n" +
> "  'scan.startup.mode' = 'earliest-offset',\n" +
> "  'properties.bootstrap.servers' = 'localhost:9092',\n" +
> "  'format' = 'changelog-json'\n" +
> ")");
>
> tableEnv.executeSql("INSERT INTO kafka.team \n" +
> "SELECT team_id, team_name, create_time, update_time \n" +
> "FROM cdc.team");
>
> // 定义带op字段的stream
>

Re: flink mysql cdc + hive streaming疑问

2020-10-31 Thread 陈帅
谢谢Jark细致解答,我按照你给的思路试了下。遇到一个问题是,在不开hive分区的情况下写入和读取是没有问题的,但在开启hive表时间分区后,写入是成功了,然而通过hive
shell查不到数据,表结构是正确的。(代码我注释掉了) 能帮忙看下是哪里写得不对吗?

cdc -> kafka示例消息如下
{"data":{"team_id":1001,"team_name":"Sun","create_time":"2020-10-31
11:25:38","update_time":"2020-10-31 11:25:38"},"op":"+I"}

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

import java.time.Duration;
import java.util.Properties;

public class MysqlCDC2Hive {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
streamEnv.setParallelism(3);
streamEnv.enableCheckpointing(6);

EnvironmentSettings tableEnvSettings =
EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(streamEnv, tableEnvSettings);

tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
CheckpointingMode.EXACTLY_ONCE);

tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofMinutes(1));

String catalogName = "hive_catalog";
HiveCatalog catalog = new HiveCatalog(
catalogName,
"default",
"/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf",
"2.3.4"
);
tableEnv.registerCatalog(catalogName, catalog);
tableEnv.useCatalog(catalogName);

MyDateFormat2 myDateFormat = new MyDateFormat2();
tableEnv.registerFunction("my_date_format", myDateFormat);

tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc");
tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team");
tableEnv.executeSql("CREATE TABLE cdc.team(\n" +
"team_id INT,\n" +
"team_name STRING,\n" +
"create_time TIMESTAMP,\n" +
"update_time TIMESTAMP,\n" +
"proctime as proctime()\n" +
") WITH (\n" +
"  'connector' = 'mysql-cdc',\n" +
"  'hostname' = 'localhost',\n" +
"  'port' = '3306',\n" +
"  'username' = 'root',\n" +
"  'password' = 'root',\n" +
"  'database-name' = 'test',\n" +
"  'table-name' = 'team'\n" +
")");

tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka");
tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team");
tableEnv.executeSql("CREATE TABLE kafka.team (\n" +
"  team_id INT,\n" +
"  team_name STRING,\n" +
"  create_time TIMESTAMP,\n" +
"  update_time TIMESTAMP\n" +
") WITH (\n" +
"  'connector' = 'kafka',\n" +
"  'topic' = 'team',\n" +
"  'scan.startup.mode' = 'earliest-offset',\n" +
"  'properties.bootstrap.servers' = 'localhost:9092',\n" +
"  'format' = 'changelog-json'\n" +
")");

tableEnv.executeSql("INSERT INTO kafka.team \n" +
"SELECT team_id, team_name, create_time, update_time \n" +
"FROM cdc.team");

// 定义带op字段的stream
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

FlinkKafkaConsumerBase consumer = new FlinkKafkaConsumer<>(
"team",
new SimpleStringSchema(),
properties
).setStartFromEarliest();

DataStream ds = streamEnv.addSource(consumer);

String[] fieldNames = {"team_id", "team_name", "create_time",
"update_time", "op"};
   

Re: flink mysql cdc + hive streaming疑问

2020-10-30 Thread Jark Wu
1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是  hive
ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。

2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive streaming
相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive
中进行合并。merge过程可以参考这篇文章[1]。

3. 你可以 ts + INTERVAL '8' HOUR

PS: 在1.12中,我们计划让 hive 也能直接写 changelog 数据,这样的话 cdc 可以直接 -> hive
streaming,不需要中间的 kafka。 不过到了 hive 中后,仍然需要另外写 query 将数据做实时merge。

Best,
Jark

On Sat, 31 Oct 2020 at 13:26, 罗显宴 <15927482...@163.com> wrote:

> hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年10月31日 12:06,陈帅 写道:
> 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> AppendStreamTableSink doesn't support consuming update and delete changes
> which is produced by node TableSourceScan(table=[[hive_catalog, cdc,
> team]], fields=[team_id, team_name, create_time, update_time])
>
> 我的问题:
> 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢?
> 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> kafka,然后kafka
> -> hive streaming? 谢谢!
> 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么?
>
> sql语句如下
>
> CREATE DATABASE IF NOT EXISTS cdc
>
> DROP TABLE IF EXISTS cdc.team
>
> CREATE TABLE team(
>team_id BIGINT,
>team_name STRING,
>create_time TIMESTAMP,
>update_time TIMESTAMP,
> proctime as proctime()
> ) WITH (
>  'connector' = 'mysql-cdc',
>  'hostname' = 'localhost',
>  'port' = '3306',
>  'username' = 'root',
>  'password' = 'root',
>  'database-name' = 'test',
>  'table-name' = 'team'
> )
>
> CREATE DATABASE IF NOT EXISTS ods
>
> DROP TABLE IF EXISTS ods.team
>
> CREATE TABLE ods.team (
>  team_id BIGINT,
>  team_name STRING,
>  create_time TIMESTAMP,
>  update_time TIMESTAMP,
> ) PARTITIONED BY (
>  ts_date STRING,
>  ts_hour STRING,
>  ts_minute STRING,
> ) STORED AS PARQUET TBLPROPERTIES (
>  'sink.partition-commit.trigger' = 'partition-time',
>  'sink.partition-commit.delay' = '1 min',
>  'sink.partition-commit.policy.kind' = 'metastore,success-file',
>  'partition.time-extractor.timestamp-pattern' = '$ts_date
> $ts_hour:$ts_minute:00'
> )
>
> INSERT INTO ods.team
> SELECT team_id, team_name, create_time, update_time,
>  my_date_format(create_time,'-MM-dd', 'Asia/Shanghai'),
>  my_date_format(create_time,'HH', 'Asia/Shanghai'),
>  my_date_format(create_time,'mm', 'Asia/Shanghai')
> FROM cdc.team
>


回复:flink mysql cdc + hive streaming疑问

2020-10-30 Thread 罗显宴
hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。


| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制

在2020年10月31日 12:06,陈帅 写道:
我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛

Exception in thread "main" org.apache.flink.table.api.TableException:
AppendStreamTableSink doesn't support consuming update and delete changes
which is produced by node TableSourceScan(table=[[hive_catalog, cdc,
team]], fields=[team_id, team_name, create_time, update_time])

我的问题:
1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢?
2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> kafka,然后kafka
-> hive streaming? 谢谢!
3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么?

sql语句如下

CREATE DATABASE IF NOT EXISTS cdc

DROP TABLE IF EXISTS cdc.team

CREATE TABLE team(
   team_id BIGINT,
   team_name STRING,
   create_time TIMESTAMP,
   update_time TIMESTAMP,
proctime as proctime()
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'localhost',
 'port' = '3306',
 'username' = 'root',
 'password' = 'root',
 'database-name' = 'test',
 'table-name' = 'team'
)

CREATE DATABASE IF NOT EXISTS ods

DROP TABLE IF EXISTS ods.team

CREATE TABLE ods.team (
 team_id BIGINT,
 team_name STRING,
 create_time TIMESTAMP,
 update_time TIMESTAMP,
) PARTITIONED BY (
 ts_date STRING,
 ts_hour STRING,
 ts_minute STRING,
) STORED AS PARQUET TBLPROPERTIES (
 'sink.partition-commit.trigger' = 'partition-time',
 'sink.partition-commit.delay' = '1 min',
 'sink.partition-commit.policy.kind' = 'metastore,success-file',
 'partition.time-extractor.timestamp-pattern' = '$ts_date
$ts_hour:$ts_minute:00'
)

INSERT INTO ods.team
SELECT team_id, team_name, create_time, update_time,
 my_date_format(create_time,'-MM-dd', 'Asia/Shanghai'),
 my_date_format(create_time,'HH', 'Asia/Shanghai'),
 my_date_format(create_time,'mm', 'Asia/Shanghai')
FROM cdc.team


flink mysql cdc + hive streaming疑问

2020-10-30 Thread 陈帅
我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛

Exception in thread "main" org.apache.flink.table.api.TableException:
AppendStreamTableSink doesn't support consuming update and delete changes
which is produced by node TableSourceScan(table=[[hive_catalog, cdc,
team]], fields=[team_id, team_name, create_time, update_time])

我的问题:
1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢?
2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> kafka,然后kafka
-> hive streaming? 谢谢!
3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么?

sql语句如下

CREATE DATABASE IF NOT EXISTS cdc

DROP TABLE IF EXISTS cdc.team

CREATE TABLE team(
team_id BIGINT,
team_name STRING,
create_time TIMESTAMP,
update_time TIMESTAMP,
proctime as proctime()
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = 'root',
  'database-name' = 'test',
  'table-name' = 'team'
)

CREATE DATABASE IF NOT EXISTS ods

DROP TABLE IF EXISTS ods.team

CREATE TABLE ods.team (
  team_id BIGINT,
  team_name STRING,
  create_time TIMESTAMP,
  update_time TIMESTAMP,
) PARTITIONED BY (
  ts_date STRING,
  ts_hour STRING,
  ts_minute STRING,
) STORED AS PARQUET TBLPROPERTIES (
  'sink.partition-commit.trigger' = 'partition-time',
  'sink.partition-commit.delay' = '1 min',
  'sink.partition-commit.policy.kind' = 'metastore,success-file',
  'partition.time-extractor.timestamp-pattern' = '$ts_date
$ts_hour:$ts_minute:00'
)

INSERT INTO ods.team
SELECT team_id, team_name, create_time, update_time,
  my_date_format(create_time,'-MM-dd', 'Asia/Shanghai'),
  my_date_format(create_time,'HH', 'Asia/Shanghai'),
  my_date_format(create_time,'mm', 'Asia/Shanghai')
FROM cdc.team


回复: flink sql CDC 同步到 hive的问题咨询

2020-10-21 Thread 史 正超
目前还不支持,因为hive不支持更新,而且filesystem connector的实现并不是DynamicTableFactory (FLIP-95 
connector),还是之前的老接口。
如果你不希望做更新操作入到hive的话,有一种思路是更改mysql-cdc的源码,让它支持的ChangeLogMode只为INSERT,然后增加一个`DebeziumDeserializationSchema`类,把每条消息当成
 insert 消息去解析,把所有字段声明出来。

发件人: zhongbaoluo 
发送时间: 2020年10月21日 11:55
收件人: user-zh 
主题: flink sql CDC 同步到 hive的问题咨询

请问,我们有一个需求;需要把 mysql 数据通过CDC 方案实时同步到 hive 数据库,flink 同步的时候正对 update 和 delete 
操作是否有什么解决方案?


**
Thanks & Best Regards!


杉欣集团-技术研究院  云平台
钟保罗


上海浦东新区东方路3261号振华广场B座23楼(杉欣集团)
email: zhongbao...@shxgroup.net
手机: 18157855633


flink sql CDC 同步到 hive的问题咨询

2020-10-21 Thread zhongbaoluo
请问,我们有一个需求;需要把 mysql 数据通过CDC 方案实时同步到 hive 数据库,flink 同步的时候正对 update 和 delete 
操作是否有什么解决方案?


**
Thanks & Best Regards!


杉欣集团-技术研究院  云平台
钟保罗


上海浦东新区东方路3261号振华广场B座23楼(杉欣集团)
email: zhongbao...@shxgroup.net
手机: 18157855633

回复: 回复:关于flink sql cdc

2020-09-29 Thread 史 正超
HI, Kyle Zhang, 我刚才重现了你的问题,虽然你的mysql 
binlog设置是ROW格式,但是不排除其它session更改了binlog_format格式。重现步骤:

  1.  登录mysql客户端(注意用cmd登录) 执行语句, SET SESSION binlog_format='MIXED'; SET SESSION 
tx_isolation='REPEATABLE-READ'; COMMIT;
  2.  随便update或者insert一条语句。

然后就得到了和你一样的错误:
2020-09-30 10:46:37.607 [debezium-engine] ERROR 
com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction  - Reporting error:
org.apache.kafka.connect.errors.ConnectException: Received DML 'update orders 
set product_id = 1122 where order_number = 10001' for processing, binlog 
probably contains events generated with statement or mixed based replication 
format
at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
at 
com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
at 
com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
at 
com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
at 
com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Received DML 
'update orders set product_id = 1122 where order_number = 10001' for 
processing, binlog probably contains events generated with statement or mixed 
based replication format
at 
io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:785)
at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)
... 5 common frames omitted

所以应该是其它session更忙了binlog_format格式,并且事务隔离级别为 REPEATABLE-READ
希望对你有帮助,
best,
shizhengchao

发件人: 谢治平 
发送时间: 2020年9月30日 1:25
收件人: user-zh 
抄送: user-zh 
主题: 回复:关于flink sql cdc

能不能退掉邮箱信息,退出




| |
谢治平
|
|
邮箱:xiezhiping...@163.com
|

签名由 网易邮箱大师 定制

在2020年09月30日 09:24,Kyle Zhang 写道:
show variables like '%binlog_format%'确实是ROW

On Tue, Sep 29, 2020 at 7:39 PM Kyle Zhang  wrote:

> Hi,all
>   今天在使用sql cdc中遇到以一个问题 ,版本1.11.2,idea中运行,我的ddl是
> CREATE TABLE mysql_binlog (
>  id INT NOT NULL,
>  emp_name STRING,
>  age INT
> ) WITH (
>  'connector' = 'mysql-cdc',
>  'hostname' = 'xxx',
>  'port' = '3306',
>  'username' = 'root',
>  'password' = 'root',
>  'database-name' = 'test',
>  'table-name' = 'empoylee1'
> );
> 结果直接用print table
> 运行一段时间后报错
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>   [] - Error during binlog processing. Last offset stored = null,
> binlog reader near position = binlog.001254/132686776
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>   [] - Failed due to error: Error processing binlog event
> org.apache.kafka.connect.errors.ConnectException: Received DML 'INSERT
> INTO execution_flows (project_id, flow_id, version, status, submit_time,
> submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
> Caused by: org.apache.kafka.connect.errors.ConnectException: Received DML
> 'INSERT INTO execution_flows (project_id, flow_id, version, status,
> submit_time, submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:785)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> ... 5 more
>
> sql cdc还会解析我其他的表然后报错么?,有没有人遇到类似的问题
>
> Best,
> Kyle Zhang
>


回复:关于flink sql cdc

2020-09-29 Thread 谢治平
能不能退掉邮箱信息,退出




| |
谢治平
|
|
邮箱:xiezhiping...@163.com
|

签名由 网易邮箱大师 定制

在2020年09月30日 09:24,Kyle Zhang 写道:
show variables like '%binlog_format%'确实是ROW

On Tue, Sep 29, 2020 at 7:39 PM Kyle Zhang  wrote:

> Hi,all
>   今天在使用sql cdc中遇到以一个问题 ,版本1.11.2,idea中运行,我的ddl是
> CREATE TABLE mysql_binlog (
>  id INT NOT NULL,
>  emp_name STRING,
>  age INT
> ) WITH (
>  'connector' = 'mysql-cdc',
>  'hostname' = 'xxx',
>  'port' = '3306',
>  'username' = 'root',
>  'password' = 'root',
>  'database-name' = 'test',
>  'table-name' = 'empoylee1'
> );
> 结果直接用print table
> 运行一段时间后报错
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>   [] - Error during binlog processing. Last offset stored = null,
> binlog reader near position = binlog.001254/132686776
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>   [] - Failed due to error: Error processing binlog event
> org.apache.kafka.connect.errors.ConnectException: Received DML 'INSERT
> INTO execution_flows (project_id, flow_id, version, status, submit_time,
> submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
> Caused by: org.apache.kafka.connect.errors.ConnectException: Received DML
> 'INSERT INTO execution_flows (project_id, flow_id, version, status,
> submit_time, submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:785)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> ... 5 more
>
> sql cdc还会解析我其他的表然后报错么?,有没有人遇到类似的问题
>
> Best,
> Kyle Zhang
>


Re: 关于flink sql cdc

2020-09-29 Thread Kyle Zhang
show variables like '%binlog_format%'确实是ROW

On Tue, Sep 29, 2020 at 7:39 PM Kyle Zhang  wrote:

> Hi,all
>   今天在使用sql cdc中遇到以一个问题 ,版本1.11.2,idea中运行,我的ddl是
> CREATE TABLE mysql_binlog (
>  id INT NOT NULL,
>  emp_name STRING,
>  age INT
> ) WITH (
>  'connector' = 'mysql-cdc',
>  'hostname' = 'xxx',
>  'port' = '3306',
>  'username' = 'root',
>  'password' = 'root',
>  'database-name' = 'test',
>  'table-name' = 'empoylee1'
> );
> 结果直接用print table
> 运行一段时间后报错
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>   [] - Error during binlog processing. Last offset stored = null,
> binlog reader near position = binlog.001254/132686776
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>   [] - Failed due to error: Error processing binlog event
> org.apache.kafka.connect.errors.ConnectException: Received DML 'INSERT
> INTO execution_flows (project_id, flow_id, version, status, submit_time,
> submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
> Caused by: org.apache.kafka.connect.errors.ConnectException: Received DML
> 'INSERT INTO execution_flows (project_id, flow_id, version, status,
> submit_time, submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:785)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> ... 5 more
>
> sql cdc还会解析我其他的表然后报错么?,有没有人遇到类似的问题
>
> Best,
> Kyle Zhang
>


Re: 关于flink sql cdc

2020-09-29 Thread Kyle Zhang
CC426ED884CB5EEC8E3B29284D6705620D76567D071BAEDEBCC57C223751AA7DF9F8EAF22432BA1A8C511EDFCF4653936D27F9FBF18AEBE58A4BE2E14620526BA55E9E305FCBE4813B9FE57047FB42DAC2F8CA54346CCFF19BA0DAA8078D124FC04A436DD68398FADA2570A567F6F21BE8E94F55305818EDD127D7A798778FCA366A47B94B910EEC72ADFE4297DABFE852FB0F75E1873BD817FFCB19BD72AAB0C52190DF302922C95508D4248CB954C5492532012535F461038AB574AA548511DEB1C0BDA19C112C78AA3CCB9683060BDAE74911944B6D6C36488C49CB7B45A0DB1BF4C65F669DE968140E26FDC7D9683A18F4069F8FAE791BC30DA05411925E70E5004761BC5CEC86BA4BCA7473D70378C6E022B4241322775ADA9CCD8F14D2925F80D595F9F9C02D8FC1675B478B5ED6ECF3CB9291747CAE20CAE775D63B99303AB1E63B089F9D499D955A65CF5C05E862CD6232873D08AF40D4DB80671B30575CACFBD2F9F14E0B762852A064BC238F7B2E4E76EB32272D974256F7264286A7DB556B7EAC37F0632FBF0077C290D8EF05'
WHERE trigger_id=42' for processing, binlog probably contains events
generated with statement or mixed based replication format
at 
io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]


On Tue, Sep 29, 2020 at 7:39 PM Kyle Zhang  wrote:

> Hi,all
>   今天在使用sql cdc中遇到以一个问题 ,版本1.11.2,idea中运行,我的ddl是
> CREATE TABLE mysql_binlog (
>  id INT NOT NULL,
>  emp_name STRING,
>  age INT
> ) WITH (
>  'connector' = 'mysql-cdc',
>  'hostname' = 'xxx',
>  'port' = '3306',
>  'username' = 'root',
>  'password' = 'root',
>  'database-name' = 'test',
>  'table-name' = 'empoylee1'
> );
> 结果直接用print table
> 运行一段时间后报错
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>   [] - Error during binlog processing. Last offset stored = null,
> binlog reader near position = binlog.001254/132686776
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>   [] - Failed due to error: Error processing binlog event
> org.apache.kafka.connect.errors.ConnectException: Received DML 'INSERT
> INTO execution_flows (project_id, flow_id, version, status, submit_time,
> submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
> Caused by: org.apache.kafka.connect.errors.ConnectException: Received DML
> 'INSERT INTO execution_flows (project_id, flow_id, version, status,
> submit_time, submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:785)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> ... 5 more
>
> sql cdc还会解析我其他的表然后报错么?,有没有人遇到类似的问题
>
> Best,
> Kyle Zhang
>


关于flink sql cdc

2020-09-29 Thread Kyle Zhang
Hi,all
  今天在使用sql cdc中遇到以一个问题 ,版本1.11.2,idea中运行,我的ddl是
CREATE TABLE mysql_binlog (
 id INT NOT NULL,
 emp_name STRING,
 age INT
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'xxx',
 'port' = '3306',
 'username' = 'root',
 'password' = 'root',
 'database-name' = 'test',
 'table-name' = 'empoylee1'
);
结果直接用print table
运行一段时间后报错
19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
[] - Error during binlog processing. Last offset stored = null, binlog
reader near position = binlog.001254/132686776
19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
[] - Failed due to error: Error processing binlog event
org.apache.kafka.connect.errors.ConnectException: Received DML 'INSERT INTO
execution_flows (project_id, flow_id, version, status, submit_time,
submit_user, update_time) values
(47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
processing, binlog probably contains events generated with statement or
mixed based replication format
at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
Caused by: org.apache.kafka.connect.errors.ConnectException: Received DML
'INSERT INTO execution_flows (project_id, flow_id, version, status,
submit_time, submit_user, update_time) values
(47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
processing, binlog probably contains events generated with statement or
mixed based replication format
at
io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:785)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
... 5 more

sql cdc还会解析我其他的表然后报错么?,有没有人遇到类似的问题

Best,
Kyle Zhang


  1   2   >