ERROR log:
.
Job has been submitted with JobID 91ac323d4d5338418883240680192f34
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File 
"/tmp/dee3edf4-08ff-4c2c-87aa-fef573fae884pyflink.zip/pyflink/table/table_environment.py",
 line 907, in execute
  File 
"/tmp/dee3edf4-08ff-4c2c-87aa-fef573fae884py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
 line 1286, in __call__
  File 
"/tmp/dee3edf4-08ff-4c2c-87aa-fef573fae884pyflink.zip/pyflink/util/exceptions.py",
 line 147, in deco
  File 
"/tmp/dee3edf4-08ff-4c2c-87aa-fef573fae884py4j-0.10.8.1-src.zip/py4j/protocol.py",
 line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o23.execute.
: java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
91ac323d4d5338418883240680192f34)
        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
        at 
org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: 91ac323d4d5338418883240680192f34)
        at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
        at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
        at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
        at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
        at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
        at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
        at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
        at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
        at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
        ... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=50, 
backoffTimeMS=6)
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
        at sun.reflect.GeneratedMethodAccessor64.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
 java.lang.IllegalStateException: Process died with exit code 0
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
        at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:211)
        at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:202)
        at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:185)
        at 
org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:179)
        at 
org.apache.flink.table.runtime.operators.python.AbstractPythonScalarFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractPythonScalarFunctionOperator.java:193)
        at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:139)
        at 
org.apache.flink.table.runtime.operators.python.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:143)
        at 
org.apache.flink.table.runtime.operators.python.BaseRowPythonScalarFunctionOperator.open(BaseRowPythonScalarFunctionOperator.java:86)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Process died with exit code 0
        at 
org.apache.beam.runners.fnexecution.environment.ProcessManager$RunningProcess.isAliveOrThrow(ProcessManager.java:74)
        at 
org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory.createEnvironment(ProcessEnvironmentFactory.java:125)
        at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:178)
        at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:162)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
        ... 20 more



> 在 2020年3月27日,上午11:22,WuPangang <wpangang1...@icloud.com.INVALID> 写道:
> 
> 感谢大佬回复。
> 根据邮件里面的提示下我尝试了如下操作:
> 
> @udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING())
> def str_add(str_name):
>  return '1'
> table_env.register_function("str_add", str_add)
> table_env.sql_update("insert into flink_sinktable_ad_test_1 \
>                        select \
>                        str_add(topicid) AS topicid \
>                        from \
>                        flink_sourcetable_ad_test \
>                        ")
> 目的:我的目的是想通过最简单的方式看看udf是否有生效。
> 结果:结果依赖没有数据流入近来。
> 其他手段和测试:我通过不使用udf来验证数据流是否正常的。结果正常。
> 
> 
> 所以能在分析下么?或者我应该如何深入的跟踪下?
> 
> --------
> all code below:
> from pyflink.datastream import 
> StreamExecutionEnvironment,CheckpointingMode,TimeCharacteristic
> from pyflink.table import StreamTableEnvironment, 
> EnvironmentSettings,TableSink,TableConfig,DataTypes
> from pyflink.table.descriptors import 
> Schema,Kafka,Rowtime,ConnectTableDescriptor,CustomConnectorDescriptor,Json
> from pyflink.common import RestartStrategies
> from pyflink.table.udf import udf
> import json
> 
> env = StreamExecutionEnvironment.get_execution_environment()
> ##contain设置
> env.set_parallelism(12)
> env.set_restart_strategy(RestartStrategies.fixed_delay_restart(50,6))
> ##使用blink api
> environment_settings = 
> EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
> table_env = 
> StreamTableEnvironment.create(env,environment_settings=environment_settings)
> 
> table_env.sql_update("CREATE TABLE flink_sourcetable_ad_test ( \
> host STRING, \
> type STRING, \
> topicid STRING, \
> message STRING, \
> proctime as PROCTIME() \
> ) WITH ( \
>  'connector.type' = 'kafka',        \
>  'connector.version' = 'universal', \
>  'connector.topic' = 'advertise_module',  \
>  'connector.properties.zookeeper.connect' = 'localhost:2181', \
>  'connector.properties.bootstrap.servers' = '172.25.80.134:9092', \
>  'connector.properties.group.id' = 'flink_1.10_test_source', \
>  'connector.startup-mode' = 'latest-offset', \
>  'format.type' = 'json' \
> )")
> 
> table_env.sql_update("CREATE TABLE flink_sinktable_ad_test_1 ( \
> topicid STRING \
> ) WITH ( \
>  'connector.type' = 'kafka',        \
>  'connector.version' = 'universal', \
>  'connector.topic' = 'recommend_user_concern_test',  \
>  'connector.properties.zookeeper.connect' = 'localhost:2181', \
>  'connector.properties.bootstrap.servers' = '172.25.82.77:9092', \
>  'connector.properties.group.id' = 'flink_1.10_test_sink', \
>  'connector.startup-mode' = 'latest-offset', \
>  'connector.properties.retries' = '3', \
>  'format.type' = 'json', \
>  'connector.properties.update_mode' = 'append' \
> )")
> @udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING())
> def str_add(str_name):
>  return '1'
> table_env.register_function("str_add", str_add)
> #table_env.register_function("str_add", udf(lambda i: i + '1', 
> DataTypes.STRING(), DataTypes.STRING()))
> table_env.sql_update("insert into flink_sinktable_ad_test_1 \
>                        select \
>                        str_add(topicid) AS topicid \
>                        from \
>                        flink_sourcetable_ad_test \
>                        ")
> table_env.execute('flink_1.10_test’)
> 
> ------
>> 在 2020年3月26日,下午5:55,jincheng sun <sunjincheng...@gmail.com> 写道:
>> 
>> 比较明显的一个问题是UDF定义有些问题是你的sink表定义了一个STRING类型的字段,但是你SQL里面是SELECT了udf的结果,所以我想你的UDF应该也返回一个STRING,也就是result_type=DataTypes.STRING(),同时确保你udf真的返回的是一个字符串,不是json的Object。
>> 
>> Best,
>> Jincheng
>> 
>> 
>> WuPangang <wpangang1...@icloud.com <mailto:wpangang1...@icloud.com>> 
>> 于2020年3月26日周四 下午5:24写道:
>> Data as below:
>> {"host":"172.25.69.145","@timestamp":"2020-03-23T09:21:16.315Z","@version":"1","offset":532178261,"logmark":"advertise-api","type":"squirrel","path":"/home/logs/app/action_log/advertise_api/AdStatistic.2020-03-23.log","message":"{\"date_time\":\"2020-03-23
>>  
>> 17:21:15\",\"serverTime\":1584955275,\"currentTime\":\"1584955274\",\"distinct_id\":\"734232168\",\"device_id\":\"ad2c15b7cf910cc6\",\"event\":\"view_material\",\"click_pos\":\"ad_view_avatar\",\"material_name\":\"\\u5934\\u50cf\\u66dd\\u5149\",\"phase\":\"prod\",\"source\":\"phone\",\"client_v\":\"2.100\",\"platform\":\"android\",\"ip\":\"39.84.23.81\",\"network\":\"WIFI\",\"idfa\":\"867179032526091\",\"unique_device_id\":\"ad2c15b7cf910cc6\",\"manufacturer\":\"HUAWEI\",\"model\":\"PRA-AL00X\",\"carrier\":\"\",\"local_dns\":\"0.0,0.0\",\"user_id\":\"734232168\",\"is_login\":1,\"url_path\":\"http:\\/\\/down-ddz.734399.com
>>  
>> <http://down-ddz.734399.com/>\\/download\\/shuabao\\/shuabao10-awddz-release-hl.apk\",\"country\":\"\",\"province\":\"\",\"city\":\"\",\"setup_source\":\"androiddefault\",\"seller_id\":\"139091\",\"plan_id\":\"380141\",\"plan_name\":\"HL-10-12.27-\\u5927\\u80f8-\\u6e38\\u620f\",\"put_platform_id\":\"25\",\"put_ad_type_id\":\"27\",\"put_position_id\":\"0\",\"put_sell_type_id\":\"0\",\"material_id\":\"378120\",\"show_id\":\"SMALL_VIDEO_6948314\",\"put_start_time\":\"1577808000\",\"plan_params\":\"advertise-380141-378120\",\"download_app_name\":\"\\u7231\\u73a9\\u6597\\u5730\\u4e3b\",\"ad_type\":\"0\",\"put_source\":\"1\",\"is_ad_recommend\":\"\",\"third_video_url\":\"\",\"third_img_url\":\"\",\"ua\":\"JuMei\\/
>>  (PRA-AL00X; Android; Android OS ; 8.0.0; zh) 
>> ApacheHttpClient\\/4.0\",\"platform_v\":\"2.100\",\"played_time\":\"\",\"video_time\":\"\",\"status\":1,\"target_link\":\"http:\\/\\/down-ddz.734399.com
>>  
>> <http://down-ddz.734399.com/>\\/download\\/shuabao\\/shuabao10-awddz-release-hl.apk\",\"ad_material_title\":\"\\u5de5\\u8d44\\u6ca13W\\uff0c\\u5c31\\u73a9\\u8fd9\\u6597\\u5730\\u4e3b\\uff0c\\u6bcf\\u5929\\u90fd\\u80fd\\u9886\\u7ea2\\u5305~\",\"ad_material_desc\":\"\\u73a9\\u6597\\u5730\\u4e3b\\uff0c\\u7ea2\\u5305\\u79d2\\u4f53\\u73b0\",\"icon_url\":\"http:\\/\\/p12.jmstatic.com
>>  
>> <http://p12.jmstatic.com/>\\/adv\\/\\/material\\/20190920\\/jmadv5d849b203750f.png\",\"button_text\":\"\\u7acb\\u5373\\u4e0b\\u8f7d\",\"material_type\":\"video\",\"third_app_id\":\"\",\"third_pos_id\":\"\",\"download_apk\":\"\",\"package_name\":\"\",\"h5_url\":\"\",\"message\":\"\",\"unit_price\":\"4.15\",\"balance_type\":\"3\",\"ecpm\":\"15.189\",\"accumulate_ctr\":\"0.366\",\"pre_ctr\":\"0.366\",\"real_unit_price\":\"0\",\"pre_charge\":\"0\",\"pre_change\":\"0\",\"subsidy_type\":\"\",\"subsidy_ratio\":\"\",\"suppress_type\":\"0\",\"suppress_ratio\":\"0\",\"two_ecpm\":\"15.165\",\"real_ecpm\":\"0\",\"real_unit_price_threshold\":\"0\",\"plan_accumulate_pv\":\"0\",\"plan_accumulate_cpv\":\"0\",\"plan_accumulate_change\":\"0\",\"request_id\":\"ad2c15b7cf910cc6-1584955121.768552967\",\"ad_activity_type\":\"0\",\"one_cost\":\"4.15\",\"bid\":\"0.366\",\"real_spr\":\"0.998419\",\"one_ecpm\":\"15.189\",\"es_one\":\"0,16,\",\"es_two\":\"0\",\"es_three\":\"0\",\"es_four\":\"0\",\"age\":\"1020\",\"sex\":\"0\",\"one_cost_expend\":\"4.15\",\"two_cost_expend\":\"4.153438\",\"reward_source\":\"\",\"provide\":\"\"}","topicid":"advertise_module","project":"advertise-api”}
>> Problem:
>> 数据是个嵌套json,并且核心字段message的格式不能直接通过table api json 相关的方法来处理。
>> 自己思考的解决思路:通过udf, 使用json.loads来处理。
>> 实际使用中遇到的问题: job提交之后,在dashboard上发现bytes received,records recevied 
>> 都是0;下游是同步给Kafka,去消费下游kafka也没有数据。
>> 
>> 
>> Code as below:
>> from pyflink.datastream import 
>> StreamExecutionEnvironment,CheckpointingMode,TimeCharacteristic
>> from pyflink.table import StreamTableEnvironment, 
>> EnvironmentSettings,TableSink,TableConfig,DataTypes
>> from pyflink.table.descriptors import 
>> Schema,Kafka,Rowtime,ConnectTableDescriptor,CustomConnectorDescriptor,Json
>> from pyflink.common import RestartStrategies
>> from pyflink.table.udf import udf
>> import json
>> 
>> env = StreamExecutionEnvironment.get_execution_environment()
>> #env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>> ##checkpoint设置
>> #env.enable_checkpointing(300000)
>> #env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode().EXACTLY_ONCE)
>> #env.get_checkpoint_config().set_min_pause_between_checkpoints(30000)
>> #env.get_checkpoint_config().set_checkpoint_timeout(60000)
>> #env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
>> #env.get_checkpoint_config().set_prefer_checkpoint_for_recovery(False)
>> ##contain设置
>> env.set_parallelism(12)
>> env.set_restart_strategy(RestartStrategies.fixed_delay_restart(50,6))
>> ##使用blink api
>> environment_settings = 
>> EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
>> table_env = 
>> StreamTableEnvironment.create(env,environment_settings=environment_settings)
>> 
>> 
>> table_env.sql_update("CREATE TABLE flink_sourcetable_ad_test ( \
>> host STRING, \
>> type STRING, \
>> topicid STRING, \
>> message STRING, \
>> proctime as PROCTIME() \
>> ) WITH ( \
>>  'connector.type' = 'kafka',        \
>>  'connector.version' = 'universal', \
>>  'connector.topic' = 'advertise_module',  \
>>  'connector.properties.zookeeper.connect' = 'localhost:2181', \
>>  'connector.properties.bootstrap.servers' = 'localhost:9092', \
>>  'connector.properties.group.id <http://connector.properties.group.id/>' = 
>> 'flink_1.10_test_source', \
>>  'connector.startup-mode' = 'latest-offset', \
>>  'format.type' = 'json', \
>>  'format.derive-schema' = 'true' \
>> )")
>> 
>> 
>> table_env.sql_update("CREATE TABLE flink_sinktable_ad_test ( \
>> message STRING \
>> ) WITH ( \
>>  'connector.type' = 'kafka',        \
>>  'connector.version' = 'universal', \
>>  'connector.topic' = 'recommend_user_concern_test',  \
>>  'connector.properties.zookeeper.connect' = 'localhost:2181', \
>>  'connector.properties.bootstrap.servers' = 'localhost:9092', \
>>  'connector.properties.group.id <http://connector.properties.group.id/>' = 
>> 'flink_1.10_test_sink', \
>>  'connector.startup-mode' = 'latest-offset', \
>>  'format.type' = 'json', \
>>  'connector.properties.retries' = '3', \
>>  'connector.properties.update_mode' = 'append' \
>> )")
>> 
>> @udf(input_types=DataTypes.STRING(), result_type=DataTypes.ROW())
>> def json_split(message):
>>  return json.loads(message.replace('\"', '"').replace('"{"', 
>> '{"').replace('}"', '}'))
>> table_env.register_function("json_split", json_split)
>> 
>> table_env.sql_update("insert into flink_sinktable_ad_test \
>>                        select \
>>                        json_split(message) AS message\
>>                        from \
>>                        flink_sourcetable_ad_test \
>>                        ")
>> table_env.execute('flink_1.10_test')
>> 
> 

回复