????????????????????null??????????????????????????????
??null ---- ??: "user-zh"
??????????????null??????????????????????????????
2022-09-09 11:36:42,866 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: HiveSource-ods_jt_hrs.ods_hrmis_HR_EMPL_Education (1/1) (2a68412dab3602a1eeda5a750b308e23) switched from RUNNING to FAILED on container_1658144991761_106260_01_02 @ hhny-cdh05 (dataPort=45015). java.lang.RuntimeException: null at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:97) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:91) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:45) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:35) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181] Caused by: java.lang.NullPointerException at org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:118) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:204) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:168) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:132) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:106) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at
Hash join exceeded maximum number of recursions, without reducing partitions enough to be memory resident. Probably cause: Too many duplicate keys.
??Flink 1.13.2 HiveCatalogHive Caused by: java.lang.RuntimeException: Hash join exceeded maximum number of recursions, without reducing partitions enough to be memory resident. Probably cause: Too many duplicate keys. at org.apache.flink.table.runtime.hashtable.BinaryHashTable.buildTableFromSpilledPartition(BinaryHashTable.java:443) ~[flink-table-blink_2.11-1.13.2.jar:1.13.2] at org.apache.flink.table.runtime.hashtable.BinaryHashTable.prepareNextPartition(BinaryHashTable.java:403) ~[flink-table-blink_2.11-1.13.2.jar:1.13.2] at org.apache.flink.table.runtime.hashtable.BinaryHashTable.nextMatching(BinaryHashTable.java:265) ~[flink-table-blink_2.11-1.13.2.jar:1.13.2] at org.apache.flink.table.runtime.operators.join.HashJoinOperator.endInput(HashJoinOperator.java:176) ~[flink-table-blink_2.11-1.13.2.jar:1.13.2] at org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapper.endOperatorInput(TableOperatorWrapper.java:124) ~[flink-table-blink_2.11-1.13.2.jar:1.13.2] at org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperator.endInput(BatchMultipleInputStreamOperator.java:56) ~[flink-table-blink_2.11-1.13.2.jar:1.13.2] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:93) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:428) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:87) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]
flink 1.13.2 ????hive????????????????NullPointerException
2022-01-23 04:31:39,568 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: HiveSource-cosldatacenter.ods_rimdrill_dailyincidentsevents - Calc(select=[jobid, reportno, dayssincelast], where=[(idno = 1:BIGINT)]) (1/1) (7533d77baa7eb16e8242ae63e0706dff) switched from RUNNING to CANCELING. 2022-01-23 04:31:39,570 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Discarding the results produced by task execution 07b2cd514c6b6d85f79ab5b953971f82. 2022-01-23 04:31:39,570 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - MultipleInput(readOrder=[0,0,1], members=[\nHashJoin(joinType=[LeftOuterJoin], where=[((jobid = jobid0) AND ($f109 = reportno0))], select=[jobid, reportno, reportdate, depthprogress, numperstype1, numperstype2, numperstype3, numperstype4, numperstype5, numperstype6, currentops, futureops, dayssincelast, $f109, jobid0, reportno0, boptestdate, bopfunctiontestdate], build=[right])\n:- Calc(select=[jobid, reportno, reportdate, depthprogress, numperstype1, numperstype2, numperstype3, numperstype4, numperstype5, numperstype6, currentops, futureops, dayssincelast, bigint(reportno) AS $f109])\n: +- HashJoin(joinType=[LeftOuterJoin], where=[((jobid = jobid0) AND ($f94 = reportno0))], select=[jobid, reportno, reportdate, depthprogress, numperstype1, numperstype2, numperstype3, numperstype4, numperstype5, numperstype6, currentops, futureops, $f94, jobid0, reportno0, dayssincelast], build=[right])\n: :- Calc(select=[jobid, reportno, reportdate, depthprogress, numperstype1, numperstype2, numperstype3, numperstype4, numperstype5, numperstype6, currentops, futureops, bigint(reportno) AS $f94])\n: : +- [#3] Exchange(distribution=[hash[jobid]])\n: +- [#2] Exchange(distribution=[hash[jobid]])\n+- [#1] Exchange(distribution=[hash[jobid]])\n]) - Calc(select=[jobid AS $f0, reportno AS $f1, dayssincelast AS $f2, depthprogress AS $f3, currentops AS $f4, futureops AS $f5, reportdate AS $f6, numperstype1 AS $f7, numperstype2 AS $f8, numperstype3 AS $f9, numperstype4 AS $f10, numperstype5 AS $f11, numperstype6 AS $f12, boptestdate AS $f13, bopfunctiontestdate AS $f14]) - HashAggregate(isMerge=[false], groupBy=[$f0, $f1, $f2, $f3, $f4, $f5, $f6, $f7, $f8, $f9, $f10, $f11, $f12], select=[$f0, $f1, $f2, $f3, $f4, $f5, $f6, $f7, $f8, $f9, $f10, $f11, $f12, MAX($f13) AS lasboptestdate, MAX($f14) AS lasbopfunctiontestdate]) - Calc(select=[$f0 AS jobid, $f1 AS reportno, string($f6) AS reportdate, bigint((nvl($f7, 0) + nvl($f8, 0)) + nvl($f9, 0)) + nvl($f10, 0)) + nvl($f11, 0)) + nvl($f12, 0))) AS pobcnt, $f2 AS dayssincelast, $f3 AS depthprogress, $f4 AS currentops, $f5 AS futureops, lasboptestdate, lasbopfunctiontestdate]) - Map - Sink: Unnamed (1/1) (3c555cbd6bf411a6111cf7eaab527d33) switched from CREATED to CANCELING. 2022-01-23 04:31:39,570 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - MultipleInput(readOrder=[0,0,1], members=[\nHashJoin(joinType=[LeftOuterJoin], where=[((jobid = jobid0) AND ($f109 = reportno0))], select=[jobid, reportno, reportdate, depthprogress, numperstype1, numperstype2, numperstype3, numperstype4, numperstype5, numperstype6, currentops, futureops, dayssincelast, $f109, jobid0, reportno0, boptestdate, bopfunctiontestdate], build=[right])\n:- Calc(select=[jobid, reportno, reportdate, depthprogress, numperstype1, numperstype2, numperstype3, numperstype4, numperstype5, numperstype6, currentops, futureops, dayssincelast, bigint(reportno) AS $f109])\n: +- HashJoin(joinType=[LeftOuterJoin], where=[((jobid = jobid0) AND ($f94 = reportno0))], select=[jobid, reportno, reportdate, depthprogress, numperstype1, numperstype2, numperstype3, numperstype4, numperstype5, numperstype6, currentops, futureops, $f94, jobid0, reportno0, dayssincelast], build=[right])\n: :- Calc(select=[jobid, reportno, reportdate, depthprogress, numperstype1, numperstype2, numperstype3, numperstype4, numperstype5, numperstype6, currentops, futureops, bigint(reportno) AS $f94])\n: : +- [#3] Exchange(distribution=[hash[jobid]])\n: +- [#2] Exchange(distribution=[hash[jobid]])\n+- [#1] Exchange(distribution=[hash[jobid]])\n]) - Calc(select=[jobid AS $f0, reportno AS $f1, dayssincelast AS $f2, depthprogress AS $f3, currentops AS $f4, futureops AS $f5, reportdate AS $f6, numperstype1 AS $f7, numperstype2 AS $f8, numperstype3 AS $f9, numperstype4 AS $f10, numperstype5 AS $f11, numperstype6 AS $f12, boptestdate AS $f13, bopfunctiontestdate AS $f14]) - HashAggregate(isMerge=[false], groupBy=[$f0, $f1, $f2, $f3, $f4, $f5, $f6, $f7, $f8, $f9, $f10, $f11, $f12], select=[$f0, $f1, $f2, $f3, $f4, $f5, $f6, $f7, $f8, $f9, $f10, $f11, $f12, MAX($f13) AS lasboptestdate, MAX($f14) AS lasbopfunctiontestdate]) - Calc(select=[$f0 AS jobid, $f1 AS reportno, string($f6) AS reportdate, bigint((nvl($f7, 0) + nvl($f8, 0)) + nvl($f9, 0)) + nvl($f10, 0)) + nvl($f11, 0)) + nvl($f12, 0))) AS pobcnt, $f2 AS
?????? ??????????????downloads/setup-pyflink-virtual-env.sh????
??venv.zip??1.14.0?? ---- ??: "user-zh" https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/python_config/#python-client-executable ??1.14.0??-pyclientexec venv.zip/venv/bin/python On Fri, Nov 19, 2021 at 10:48 AM Asahi Lee <978466...@qq.com.invalidgt; wrote: gt; ??source my_env/bin/activate??PYFLINK_CLIENT_EXECUTABLE?? gt; jobmanagerNo module named pyflinkjobmanageryarn gt; ?? gt; gt; gt; amp;gt; LogType:jobmanager.out gt; amp;gt; Log Upload Time:?? ?? 18 20:48:45 +0800 2021 gt; amp;gt; LogLength:37 gt; amp;gt; Log Contents: gt; amp;gt; /bin/python: No module named pyflink gt; gt; gt; gt; gt; --amp;nbsp;amp;nbsp;-- gt; ??: gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; "user-zh" gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; < gt; dian0511...@gmail.comamp;gt;; gt; :amp;nbsp;2021??11??19??(??) 9:38 gt; ??:amp;nbsp;"user-zh"https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#python-interpreter-of-client gt <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#python-interpreter-of-clientgt; ; gt; On Thu, Nov 18, 2021 at 9:00 PM Asahi Lee <978466...@qq.com.invalid amp;gt; gt; wrote: gt; gt; amp;gt; Hi ! gt; amp;gt; amp;amp;nbsp; amp;amp;nbsp;java Table api??python udf gt; amp;gt; pythonjm??/bin/python: No module gt; named gt; amp;gt; pyflink?? gt; amp;gt; gt; amp;gt; gt; amp;gt; ./flink-1.13.2/bin/flinkamp;amp;nbsp; gt; amp;gt; run-application -t yarn-applicationamp;amp;nbsp; gt; amp;gt; gt; -Dyarn.provided.lib.dirs="hdfs://nameservice1/user/flink/flinklib"amp;amp;nbsp; gt; amp;gt; -Dyarn.application.queue=d gt; amp;gt; -p 1amp;amp;nbsp; gt; amp;gt; -pyarch /opt/venv.zip gt; amp;gt; -pyexec venv.zip/venv/bin/pythonamp;amp;nbsp; gt; amp;gt; -pyfs /opt/test.pyamp;amp;nbsp; gt; amp;gt; -c test.PyUDFTestamp;amp;nbsp; gt; amp;gt; /opt/flink-python-test-1.0-SNAPSHOT.jar gt; amp;gt; gt; amp;gt; gt; amp;gt; gt; amp;gt; ?? gt; amp;gt; Caused by: java.lang.RuntimeException: Python callback server start gt; failed! gt; amp;gt; amp;amp;nbsp; amp;amp;nbsp; amp;amp;nbsp; amp;amp;nbsp; at gt; amp;gt; gt; org.apache.flink.client.python.PythonFunctionFactory.createPythonFunctionFactory(PythonFunctionFactory.java:167) gt; amp;gt; ~[flink-python_2.11-1.13.2.jar:1.13.2] gt; amp;gt; amp;amp;nbsp; amp;amp;nbsp; amp;amp;nbsp; amp;amp;nbsp; at gt; amp;gt; gt; org.apache.flink.client.python.PythonFunctionFactory$1.load(PythonFunctionFactory.java:88) gt; amp;gt; ~[flink-python_2.11-1.13.2.jar:1.13.2] gt; amp;gt; amp;amp;nbsp; amp;amp;nbsp; amp;amp;nbsp; amp;amp;nbsp; at gt; amp;gt; gt; org.apache.flink.client.python.PythonFunctionFactory$1.load(PythonFunctionFactory.java:84) gt; amp;gt; ~[flink-python_2.11-1.13.2.jar:1.13.2] gt; amp;gt; amp;amp;nbsp; amp;amp;nbsp; amp;amp;nbsp; amp;amp;nbsp; at gt; amp;gt; gt; org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) gt; amp;gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1] gt; amp;gt; amp;amp;nbsp; amp;amp;nbsp; amp;amp;nbsp; amp;amp;nbsp; at gt; amp;gt; gt; org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) gt; amp;gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1] gt; amp;gt; amp;amp;nbsp; amp;amp;nbsp; amp;amp;nbsp; amp;amp;nbsp; at gt; amp;gt; gt; org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) gt; amp;gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1] gt; amp;gt; amp;amp;nbsp; amp;amp;nbsp; amp;amp;nbsp; amp;amp;nbsp; at gt; amp;gt; gt; org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(
?????? ??????????????downloads/setup-pyflink-virtual-env.sh????
Hi! ??python??-D??-py ./flink-1.14.0/bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://nameservice1/user/flink/flinklib" -Dyarn.application.queue=d -Dpython.archives="/opt/venv.zip" -Dpython.client.executable="venv.zip/venv/bin/python" -Dpython.executable="venv.zip/venv/bin/python" -Dpython.files="/opt/test.py" -p 1 -c test.PyUDFTest /opt/flink-python-test-1.0-SNAPSHOT.jar ?? taskmanager.log 2021-11-19 13:59:24,030 ERROR /yarn/nm/usercache/tempodata/appcache/application_1637293573159_0150/python-dist-6a5f3e17-4ecf-4782-b9ac-0e944d69/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:296 [] - Error processing instruction 1. Original traceback is Traceback (most recent call last): File "/yarn/nm/usercache/tempodata/appcache/application_1637293573159_0150/python-dist-6a5f3e17-4ecf-4782-b9ac-0e944d69/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute response = task() File "/yarn/nm/usercache/tempodata/appcache/application_1637293573159_0150/python-dist-6a5f3e17-4ecf-4782-b9ac-0e944d69/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/python_config/#python-client-executable ??1.14.0??-pyclientexec venv.zip/venv/bin/python On Fri, Nov 19, 2021 at 10:48 AM Asahi Lee <978466...@qq.com.invalid wrote: ??source my_env/bin/activate??PYFLINK_CLIENT_EXECUTABLE?? jobmanagerNo module named pyflinkjobmanageryarn ?? gt; LogType:jobmanager.out gt; Log Upload Time:?? ?? 18 20:48:45 +0800 2021 gt; LogLength:37 gt; Log Contents: gt; /bin/python: No module named pyflink --nbsp;nbsp;-- ??: "user-zh" < dian0511...@gmail.comgt;; :nbsp;2021??11??19??(??) 9:38 ??:nbsp;"user-zh"https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#python-interpreter-of-client On Thu, Nov 18, 2021 at 9:00 PM Asahi Lee <978466...@qq.com.invalidgt; wrote: gt; Hi ! gt; amp;nbsp; amp;nbsp;java Table api??python udf gt; pythonjm??/bin/python: No module named gt; pyflink?? gt; gt; gt; ./flink-1.13.2/bin/flinkamp;nbsp; gt; run-application -t yarn-applicationamp;nbsp; gt; -Dyarn.provided.lib.dirs="hdfs://nameservice1/user/flink/flinklib"amp;nbsp; gt; -Dyarn.application.queue=d gt; -p 1amp;nbsp; gt; -pyarch /opt/venv.zip gt; -pyexec venv.zip/venv/bin/pythonamp;nbsp; gt; -pyfs /opt/test.pyamp;nbsp; gt; -c test.PyUDFTestamp;nbsp; gt; /opt/flink-python-test-1.0-SNAPSHOT.jar gt; gt; gt; gt; ?? gt; Caused by: java.lang.RuntimeException: Python callback server start failed! gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp; at gt; org.apache.flink.client.python.PythonFunctionFactory.createPythonFunctionFactory(PythonFunctionFactory.java:167) gt; ~[flink-python_2.11-1.13.2.jar:1.13.2] gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp; at gt; org.apache.flink.client.python.PythonFunctionFactory$1.load(PythonFunctionFactory.java:88) gt; ~[flink-python_2.11-1.13.2.jar:1.13.2] gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp; at gt; org.apache.flink.client.python.PythonFunctionFactory$1.load(PythonFunctionFactory.java:84) gt; ~[flink-python_2.11-1.13.2.jar:1.13.2] gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp; at gt; org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1] gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp; at gt; org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1] gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp; at gt; org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1] gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp; at gt; org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1] gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp; at gt; org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1] gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp; at gt; org.apache.f
?????? ??????????????downloads/setup-pyflink-virtual-env.sh????
??source my_env/bin/activate??PYFLINK_CLIENT_EXECUTABLE?? jobmanagerNo module named pyflinkjobmanageryarn ?? LogType:jobmanager.out Log Upload Time:?? ?? 18 20:48:45 +0800 2021 LogLength:37 Log Contents: /bin/python: No module named pyflink ---- ??: "user-zh" https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#python-interpreter-of-client On Thu, Nov 18, 2021 at 9:00 PM Asahi Lee <978466...@qq.com.invalid wrote: Hi ! nbsp; nbsp;java Table api??python udf pythonjm??/bin/python: No module named pyflink?? ./flink-1.13.2/bin/flinknbsp; run-application -t yarn-applicationnbsp; -Dyarn.provided.lib.dirs="hdfs://nameservice1/user/flink/flinklib"nbsp; -Dyarn.application.queue=d -p 1nbsp; -pyarch /opt/venv.zip -pyexec venv.zip/venv/bin/pythonnbsp; -pyfs /opt/test.pynbsp; -c test.PyUDFTestnbsp; /opt/flink-python-test-1.0-SNAPSHOT.jar ?? Caused by: java.lang.RuntimeException: Python callback server start failed! nbsp; nbsp; nbsp; nbsp; at org.apache.flink.client.python.PythonFunctionFactory.createPythonFunctionFactory(PythonFunctionFactory.java:167) ~[flink-python_2.11-1.13.2.jar:1.13.2] nbsp; nbsp; nbsp; nbsp; at org.apache.flink.client.python.PythonFunctionFactory$1.load(PythonFunctionFactory.java:88) ~[flink-python_2.11-1.13.2.jar:1.13.2] nbsp; nbsp; nbsp; nbsp; at org.apache.flink.client.python.PythonFunctionFactory$1.load(PythonFunctionFactory.java:84) ~[flink-python_2.11-1.13.2.jar:1.13.2] nbsp; nbsp; nbsp; nbsp; at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) ~[flink-dist_2.11-1.13.1.jar:1.13.1] nbsp; nbsp; nbsp; nbsp; at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) ~[flink-dist_2.11-1.13.1.jar:1.13.1] nbsp; nbsp; nbsp; nbsp; at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) ~[flink-dist_2.11-1.13.1.jar:1.13.1] nbsp; nbsp; nbsp; nbsp; at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) ~[flink-dist_2.11-1.13.1.jar:1.13.1] nbsp; nbsp; nbsp; nbsp; at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) ~[flink-dist_2.11-1.13.1.jar:1.13.1] nbsp; nbsp; nbsp; nbsp; at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3941) ~[flink-dist_2.11-1.13.1.jar:1.13.1] nbsp; nbsp; nbsp; nbsp; at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4824) ~[flink-dist_2.11-1.13.1.jar:1.13.1] nbsp; nbsp; nbsp; nbsp; at org.apache.flink.client.python.PythonFunctionFactory.getPythonFunction(PythonFunctionFactory.java:129) ~[flink-python_2.11-1.13.2.jar:1.13.2] nbsp; nbsp; nbsp; nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_111] nbsp; nbsp; nbsp; nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_111] nbsp; nbsp; nbsp; nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_111] nbsp; nbsp; nbsp; nbsp; at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_111] nbsp; nbsp; nbsp; nbsp; at org.apache.flink.table.functions.python.utils.PythonFunctionUtils.getPythonFunction(PythonFunctionUtils.java:45) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] nbsp; nbsp; nbsp; nbsp; at org.apache.flink.table.functions.UserDefinedFunctionHelper.instantiateFunction(UserDefinedFunctionHelper.java:206) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] nbsp; nbsp; nbsp; nbsp; at org.apache.flink.table.catalog.FunctionCatalog.getFunctionDefinition(FunctionCatalog.java:659) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] nbsp; nbsp; nbsp; nbsp; at org.apache.flink.table.catalog.FunctionCatalog.resolveAmbiguousFunctionReference(FunctionCatalog.java:606) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] nbsp; nbsp; nbsp; nbsp; at org.apache.flink.table.catalog.FunctionCatalog.lookupFunction(FunctionCatalog.java:362) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] nbsp; nbsp; nbsp; nbsp; at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:97) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
????????????????????downloads/setup-pyflink-virtual-env.sh????
jar:1.13.1] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at test.PyUDFTest.main(PyUDFTest.java:22) ~[flink-python-test-1.0-SNAPSHOT.jar:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_111] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_111] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_111] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_111] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.11-1.13.1.jar:1.13.1] ... 10 more 2021-11-18 20:48:44,458 INFO org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:45070 2021-11-18 20:48:44,458 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting YarnApplicationClusterEntryPoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally.. 2021-11-18 20:48:44,458 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting down rest endpoint. 2021-11-18 20:48:44,474 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing cache directory /tmp/flink-web-dd82c3c0-f457-492d-8e64-5ae74fe9abbd/flink-web-ui 2021-11-18 20:48:44,474 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - http://cdh5node3:40216 lost leadership 2021-11-18 20:48:44,474 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down complete. 2021-11-18 20:48:44,474 INFO org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent [] - Closing components. 2021-11-18 20:48:44,475 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Stopping SessionDispatcherLeaderProcess. 2021-11-18 20:48:44,475 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Stopping dispatcher akka.tcp://flink@cdh5node3:34697/user/rpc/dispatcher_1. 2021-11-18 20:48:44,476 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Stopping all currently running jobs of dispatcher akka.tcp://flink@cdh5node3:34697/user/rpc/dispatcher_1. LogType:jobmanager.out Log Upload Time:?? ?? 18 20:48:45 +0800 2021 LogLength:37 Log Contents: /bin/python: No module named pyflink ---- ??: "user-zh" https://nightlies.apache.org/flink/flink-docs-release-1.12/downloads/setup-pyflink-virtual-env.sh ?? 2021-11-18 15:05:03??"Asahi Lee" <978466...@qq.com.INVALID ?? Hi! nbsp; nbsp; flink??nbsp;nbsp;setup-pyflink-virtual-env.sh python?? https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/python/faq/
??????????????downloads/setup-pyflink-virtual-env.sh????
Hi! flink??setup-pyflink-virtual-env.sh python?? https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/python/faq/
flink 1.13.2 ?? Java/Scala ?????????? Python UDF??????????yarn-application??????yarn????????????????????????pyflink?
HI! ??flink 1.13.2??java table apipython udf??yarn-applicationyarn??pyflink?
flink 1.13.1 ????yarn-application????????????????mysql??????????????hive??????????????16G+??Taskmangaer????
hi! ??flink sqlmysql??hive??yarn-application??16G??
flink 1.13.2 ????avg??????int????????????????????????int??????????????????
hi! ??flink 1.13.2?? int ??avg?? int double??decimal??bug
?????? flink 1.13.1 ????hive????????????
2021-09-06 11:20:32,787 WARN org.apache.hadoop.hive.metastore.MetaStoreDirectSql: [pool-4-thread-192]: Failed to execute [select "COLUMN_NAME", "COLUMN_TYPE", "LONG_LOW_VALUE", "LONG_HIGH_VALUE", "DOUBLE_LOW_VALUE", "DOUBLE_HIGH_VALUE", "BIG_DECIMAL_LOW_VALUE", "BIG_DECIMAL_HIGH_VALUE", "NUM_NULLS", "NUM_DISTINCTS", "AVG_COL_LEN", "MAX_COL_LEN", "NUM_TRUES", "NUM_FALSES", "LAST_ANALYZED" from "TAB_COL_STATS" where "DB_NAME" = ? and "TABLE_NAME" = ? and "COLUMN_NAME" in (?,?)] with parameters [cosldatacenter, dw_zyxx_test, a, ??] javax.jdo.JDODataStoreException: Error executing SQL query "select "COLUMN_NAME", "COLUMN_TYPE", "LONG_LOW_VALUE", "LONG_HIGH_VALUE", "DOUBLE_LOW_VALUE", "DOUBLE_HIGH_VALUE", "BIG_DECIMAL_LOW_VALUE", "BIG_DECIMAL_HIGH_VALUE", "NUM_NULLS", "NUM_DISTINCTS", "AVG_COL_LEN", "MAX_COL_LEN", "NUM_TRUES", "NUM_FALSES", "LAST_ANALYZED" from "TAB_COL_STATS" where "DB_NAME" = ? and "TABLE_NAME" = ? and "COLUMN_NAME" in (?,?)". at org.datanucleus.api.jdo.NucleusJDOHelper.getJDOExceptionForNucleusException(NucleusJDOHelper.java:451) at org.datanucleus.api.jdo.JDOQuery.executeWithArray(JDOQuery.java:321) at org.apache.hadoop.hive.metastore.MetaStoreDirectSql.executeWithArray(MetaStoreDirectSql.java:1611) at org.apache.hadoop.hive.metastore.MetaStoreDirectSql.access$1300(MetaStoreDirectSql.java:83) at org.apache.hadoop.hive.metastore.MetaStoreDirectSql$13.run(MetaStoreDirectSql.java:1173) at org.apache.hadoop.hive.metastore.MetaStoreDirectSql.runBatched(MetaStoreDirectSql.java:1656) at org.apache.hadoop.hive.metastore.MetaStoreDirectSql.getTableStats(MetaStoreDirectSql.java:1183) at org.apache.hadoop.hive.metastore.ObjectStore$9.getSqlResult(ObjectStore.java:6590) at org.apache.hadoop.hive.metastore.ObjectStore$9.getSqlResult(ObjectStore.java:6587) at org.apache.hadoop.hive.metastore.ObjectStore$GetHelper.run(ObjectStore.java:2616) at org.apache.hadoop.hive.metastore.ObjectStore.getTableColumnStatisticsInternal(ObjectStore.java:6586) at org.apache.hadoop.hive.metastore.ObjectStore.getTableColumnStatistics(ObjectStore.java:6580) at sun.reflect.GeneratedMethodAccessor74.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.hive.metastore.RawStoreProxy.invoke(RawStoreProxy.java:103) at com.sun.proxy.$Proxy7.getTableColumnStatistics(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table_statistics_req(HiveMetaStore.java:4644) at sun.reflect.GeneratedMethodAccessor73.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invokeInternal(RetryingHMSHandler.java:140) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:99) at com.sun.proxy.$Proxy9.get_table_statistics_req(Unknown Source) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Processor$get_table_statistics_req.getResult(ThriftHiveMetastore.java:11028) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Processor$get_table_statistics_req.getResult(ThriftHiveMetastore.java:11012) at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39) at org.apache.hadoop.hive.metastore.TUGIBasedProcessor$1.run(TUGIBasedProcessor.java:110) at org.apache.hadoop.hive.metastore.TUGIBasedProcessor$1.run(TUGIBasedProcessor.java:106) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920) at org.apache.hadoop.hive.metastore.TUGIBasedProcessor.process(TUGIBasedProcessor.java:118) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) NestedThrowablesStackTrace: java.sql.SQLException: Illegal mix of collations (latin1_bin,IMPLICIT), (utf8mb4_general_ci,COERCIBLE), (utf8mb4_general_ci,COERCIBLE) for operation ' IN ' at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965) at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3933) at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3869) at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2524) at
flink 1.13.1 ????hive????????????
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. Failed to get table column stats of table cosldatacenter.ods_zyxx_coslzj_towing at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.11-1.13.1.jar:1.13.1] ... 10 more Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. Failed to get table column stats of table cosldatacenter.ods_zyxx_coslzj_towing at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.delegation.hive.HiveParser.processCmd(HiveParser.java:284) ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:217) ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:51) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at com.meritdata.cloud.tempo.dw.flow.engine.app.ScriptEngine.scriptRunning(ScriptEngine.java:201) ~[tempo-dw-flow-engine-app-0.0.1-SNAPSHOT.jar:?] at com.meritdata.cloud.tempo.dw.flow.engine.app.ScriptEngine.execute(ScriptEngine.java:171) ~[tempo-dw-flow-engine-app-0.0.1-SNAPSHOT.jar:?] at com.meritdata.cloud.tempo.dw.flow.engine.app.ScriptEngine.main(ScriptEngine.java:128) ~[tempo-dw-flow-engine-app-0.0.1-SNAPSHOT.jar:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_141] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_141] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_141] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_141] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.11-1.13.1.jar:1.13.1] ... 10 more Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to get table column stats of table cosldatacenter.ods_zyxx_coslzj_towing at org.apache.flink.table.catalog.hive.HiveCatalog.getTableColumnStatistics(HiveCatalog.java:1635) ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.extractTableStats(DatabaseCalciteSchema.java:121) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getStatistic(DatabaseCalciteSchema.java:107) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:82) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at java.util.Optional.map(Optional.java:215) ~[?:1.8.0_141] at org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:77) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at
flink 1.13.1 ????hive orc??????????????????????????
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1024 at org.apache.orc.impl.TreeReaderFactory$TreeReader.nextVector(TreeReaderFactory.java:269) ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1] at org.apache.orc.impl.TreeReaderFactory$TimestampTreeReader.nextVector(TreeReaderFactory.java:1007) ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1] at org.apache.orc.impl.ConvertTreeReaderFactory$DateFromTimestampTreeReader.nextVector(ConvertTreeReaderFactory.java:2115) ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1] at org.apache.orc.impl.TreeReaderFactory$StructTreeReader.nextBatch(TreeReaderFactory.java:2012) ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1] at org.apache.orc.impl.RecordReaderImpl.nextBatch(RecordReaderImpl.java:1300) ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1] at org.apache.flink.orc.nohive.shim.OrcNoHiveShim.nextBatch(OrcNoHiveShim.java:94) ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1] at org.apache.flink.orc.nohive.shim.OrcNoHiveShim.nextBatch(OrcNoHiveShim.java:41) ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1] at org.apache.flink.orc.AbstractOrcFileInputFormat$OrcVectorizedReader.readBatch(AbstractOrcFileInputFormat.java:260) ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1] at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_141] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_141] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_141] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_141]
flink hive ??????????instr????????????????????????
hi! ??flink 1.13.1??hivesql, ?? col1??string: ab'cd ,??instr ' ?? instr(col1, '\'') instr(col1, ) instr(col1, '\''') instr(col1, '\\''')
Could not execute ALTER TABLE check_rule_base_hive_catalog.test_flink.test_partition DROP PARTITION (dt=2021-08-31)
hi! ??flink 1.13.1??hivedtsql hive?? Caused by: org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException: PartitionSpec CatalogPartitionSpec{{dt=2021-08-31}} does not match partition keys [dt, xtlx, sblx] of table test_flink.test_partition in catalog check_rule_base_hive_catalog. at org.apache.flink.table.catalog.hive.HiveCatalog.getOrderedFullPartitionValues(HiveCatalog.java:1189) ~[flink-sql-connector-hive-2.3.6_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.catalog.hive.HiveCatalog.dropPartition(HiveCatalog.java:899) ~[flink-sql-connector-hive-2.3.6_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:982) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
flink ??????????????truncate table????
hi! flink??truncate table??flink hivetruncate table??
flink 1.13.1??????????hive??????????insert overwirite??????????????????????????????????????????
hi?? ??sqlselect0??hive INSERT OVERWRITE target_table SELECT * from source_table where id 10;
flink 1.13.1 ????hive????sql, ????Can not make a progress: all selected inputs are already finished
java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_141] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_141] at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) ~[?:1.8.0_141] at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ~[?:1.8.0_141] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_141] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) ~[?:1.8.0_141] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:257) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_141] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_141] at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159) [flink-dist_2.11-1.13.1.jar:1.13.1] at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) [flink-dist_2.11-1.13.1.jar:1.13.1] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) [flink-dist_2.11-1.13.1.jar:1.13.1] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.1.jar:1.13.1] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.1.jar:1.13.1] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.1.jar:1.13.1] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.1.jar:1.13.1] Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application. ... 11 more Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: Application Status: FAILED at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.11-1.13.1.jar:1.13.1] ... 10 more Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: Application Status: FAILED at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_141] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) ~[?:1.8.0_141] at com.meritdata.cloud.tempo.dw.flow.engine.app.ScriptEngine.scriptRunning(ScriptEngine.java:200) ~[tempo-dw-flow-engine-app-0.0.1-SNAPSHOT.jar:?] at com.meritdata.cloud.tempo.dw.flow.engine.app.ScriptEngine.execute(ScriptEngine.java:169) ~[tempo-dw-flow-engine-app-0.0.1-SNAPSHOT.jar:?] at com.meritdata.cloud.tempo.dw.flow.engine.app.ScriptEngine.main(ScriptEngine.java:127) ~[tempo-dw-flow-engine-app-0.0.1-SNAPSHOT.jar:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_141] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_141] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_141] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_141] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
?????? flink 1.13.1 ????hive??????????hive sql????????
SqlParser.parseStmtlist()sqlSqlNode toString()??unicode ---- ??: "user-zh"
?????? flink 1.13.1 ????hive??????????hive sql????????
hive 1.1.0 ---- ??: "user-zh"
flink 1.13.1 ????hive??????like????????????
hihive??sql??like ?? org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: like(VARCHAR(255), STRING NOT NULL) org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: like(STRING, STRING NOT NULL) If you think this function should be supported, you can create an issue and start a discussion for it. at org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:845) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:845) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at scala.Option.getOrElse(Option.scala:121) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:844) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:849) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at scala.Option.getOrElse(Option.scala:121) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:837) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:529) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:155) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:173) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:50) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.codegen.CalcCodeGenerator.generateCalcOperator(CalcCodeGenerator.scala) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.java:58) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:80) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:79) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:79) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] at
?????? flink 1.13.1 ????hive??????????hive sql????????
CREATE TABLE `cosldatacenter.ods_emp_maindata_iadc_paramvalue`( `paramvalue_id` string COMMENT '', `platform_id` string COMMENT '', `equipment_id` string COMMENT '', `param_id` string COMMENT '', `param_value` string COMMENT '', `remark` string COMMENT '', `create_time` string COMMENT '', `creator` string COMMENT '', `update_time` string COMMENT '', `update_person` string COMMENT '', `record_flag` double COMMENT '', `subject_id` string COMMENT '', `output_unit` string COMMENT '', `show_seq` double COMMENT '') COMMENT '' ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' WITH SERDEPROPERTIES ( 'field.delim'=',', 'serialization.format'=',') STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' LOCATION 'hdfs://hadoop02:8020/user/hive/warehouse/cosldatacenter.db/ods_emp_maindata_iadc_paramvalue' TBLPROPERTIES ( 'COLUMN_STATS_ACCURATE'='false', 'last_modified_by'='root', 'last_modified_time'='1621834335', 'numFiles'='0', 'numRows'='-1', 'rawDataSize'='-1', 'totalSize'='0', 'transient_lastDdlTime'='1621834335') CREATE TABLE `cosldatacenter.ods_emp_md_large_equip`( `large_equip_id` string COMMENT '', `equip_name` string COMMENT '', `equip_type` string COMMENT '', `equip_function` string COMMENT '', `equip_board` string COMMENT '', `ship_yard` string COMMENT '', `manufacturer_date` string COMMENT '', `enqueue_date` string COMMENT '', `dockrepair_date` string COMMENT '', `scrap_date` string COMMENT '', `enqueue_mode` string COMMENT '', `work_for_org` string COMMENT '', `work_in_org` string COMMENT '', `old_age` string COMMENT '', `create_time` date COMMENT '', `creator` string COMMENT '', `update_time` date COMMENT '', `update_person` string COMMENT '', `record_flag` double COMMENT '', `data_timestamp` string COMMENT '', `work_unit_id` string COMMENT '', `work_status` string COMMENT '', `work_location` string COMMENT '', `work_area` string COMMENT '', `equip_code` string COMMENT '', `shi_main_power` double COMMENT '', `shi_total_len` double COMMENT '', `shi_type_width` double COMMENT '', `shi_type_depth` double COMMENT '', `shi_design_draft` double COMMENT '', `shi_total_tonnage` double COMMENT '', `shi_load_tonnage` double COMMENT '', `remark` string COMMENT '', `unit_classification1` string COMMENT '', `unit_classification2` string COMMENT '') COMMENT '' ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' WITH SERDEPROPERTIES ( 'field.delim'=',', 'serialization.format'=',') STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' LOCATION 'hdfs://hadoop02:8020/user/hive/warehouse/cosldatacenter.db/ods_emp_md_large_equip' TBLPROPERTIES ( 'COLUMN_STATS_ACCURATE'='false', 'last_modified_by'='root', 'last_modified_time'='1621834338', 'numFiles'='0', 'numRows'='-1', 'rawDataSize'='-1', 'totalSize'='0', 'transient_lastDdlTime'='1621834338') CREATE TABLE `cosldatacenter.ods_emp_maindata_iadc_paramdef`( `param_id` string COMMENT '', `iadc_id` string COMMENT '', `param_code` string COMMENT '', `param_en` string COMMENT '', `param_cn` string COMMENT '', `output_standard` string COMMENT '', `output_unit` string COMMENT '', `param_type` string COMMENT '', `param_value` string COMMENT '', `remark` string COMMENT '', `create_time` string COMMENT '', `creator` string COMMENT '', `update_time` string COMMENT '', `update_person` string COMMENT '', `record_flag` double COMMENT '') COMMENT '' ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' WITH SERDEPROPERTIES ( 'field.delim'=',', 'serialization.format'=',') STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' LOCATION 'hdfs://hadoop02:8020/user/hive/warehouse/cosldatacenter.db/ods_emp_maindata_iadc_paramdef' TBLPROPERTIES ( 'COLUMN_STATS_ACCURATE'='false', 'last_modified_by'='root', 'last_modified_time'='1621834335', 'numFiles'='0', 'numRows'='-1', 'rawDataSize'='-1', 'totalSize'='0', 'transient_lastDdlTime'='1621834335') CREATE TABLE `cosldatacenter.dw_riginfoparam`( `large_equip_id` string, `equip_code` string, `equip_name` string, `enqueue_date` string, `shi_total_len` double, `shi_type_width` double, `shi_type_depth` double, `moonpool` string, `maxwindvelocity` string, `maxwaveheight` string, `airgap` string, `maxopewaterdepth` string, `drilldepthcap` string, `drillvl` string, `drillwater` string, `potablewater` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' WITH SERDEPROPERTIES ( 'field.delim'=',', 'serialization.format'=',') STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' LOCATION 'hdfs://hadoop02:8020/user/hive/warehouse/cosldatacenter.db/dw_riginfoparam'
?????? flink 1.13.1, metrics?????????????? Unnamed
sql??sql ---- ??: "user-zh"
flink 1.13.1, metrics?????????????? Unnamed
Hi?? ??sql??Unnamed??bug?? ?? node103.taskmanager.container_1627469681067_0030_01_02.e621b91ec4a34ababeb6b0e2c4d6f22b.Source: HiveSource-qc_test_t_student_score - Calc(select=[id, CAST(_UTF-16LE'Bob':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS name, class_id, class_name, score, _UTF-16LE'9bdb0e98cc5b4800ae3b56575c442225':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS rule_id, _UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS task_batch_id], where=[(name = _UTF-16LE'Bob':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) - Map - Sink: Unnamed.0.Shuffle.Netty.Input.Buffers.inputFloatingBuffersUsage node103.taskmanager.container_1627469681067_0030_01_02.e621b91ec4a34ababeb6b0e2c4d6f22b.Source: HiveSource-qc_test_t_student_score - Calc(select=[id, CAST(_UTF-16LE'Bob':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS name, class_id, class_name, score, _UTF-16LE'9bdb0e98cc5b4800ae3b56575c442225':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS rule_id, _UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS task_batch_id], where=[(name = _UTF-16LE'Bob':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) - Map - Sink: Unnamed.0.Shuffle.Netty.Input.Buffers.inPoolUsage node103.taskmanager.container_1627469681067_0030_01_02.e621b91ec4a34ababeb6b0e2c4d6f22b.Sink: Unnamed.0.numRecordsIn sql?? CREATE CATALOG `qc_hive_catalog` WITH ( 'type' = 'hive', 'default-database' = 'qc_test' ); USE CATALOG `qc_hive_catalog`; CREATE TABLE IF NOT EXISTS QC_RESULT_T_STUDENT_SCORE ( id STRING, NAME STRING, class_id STRING, class_name STRING, score INTEGER, rule_id STRING, task_batch_id STRING ) WITH ( 'is_generic' = 'false', 'connector' = 'hive' ); INSERT INTO QC_RESULT_T_STUDENT_SCORE SELECT id, NAME, class_id, class_name, score, cast( '9bdb0e98cc5b4800ae3b56575c442225' AS STRING ) AS rule_id, cast( '3' AS STRING ) AS task_batch_id FROM t_student_score WHERE t_student_score.NAME = 'Bob';
?????? flink 1.13.1 ????hive??????????hive sql????????
hi! ??else??sqlInvalid table alias or column reference 'u' ??sql'u' CREATE CATALOG `tempo_df_hive_default_catalog` WITH( 'type' = 'hive', 'default-database' = 'default' ); USE CATALOG tempo_df_hive_default_catalog; CREATE TABLE IF NOT EXISTS `default`.`tempo_blackhole_table` ( f0 INT ); insert into cosldatacenter.dw_riginfoparam select c.LARGE_EQUIP_ID, c.EQUIP_CODE, c.EQUIP_NAME, c.ENQUEUE_DATE, c.SHI_TOTAL_LEN, c.SHI_TYPE_WIDTH, c.SHI_TYPE_DEPTH, case when b.param_cn = '' then a.param_value else null end as Moonpool, case when b.param_cn = '' then a.param_value else null end as MaxWindvelocity, case when b.param_cn = '' then a.param_value else null end as MaxWaveheight, case when b.param_cn = '' then a.param_value else null end as Airgap, case when b.param_cn = '' then a.param_value else null end as MaxOpeWaterdepth, case when b.param_cn = '' then a.param_value else null end as DrilldepthCap, case when b.param_cn = '' then a.param_value else null end as DrillVL, case when b.param_cn = '??' then a.param_value else null end as DrillWater, case when b.param_cn = '??' then a.param_value else null end as PotableWater from cosldatacenter.ods_emp_maindata_iadc_paramvalue a inner join cosldatacenter.ods_emp_maindata_iadc_paramdef b on a.param_id = b.param_id inner join cosldatacenter.ods_emp_md_large_equip c on a.SUBJECT_ID=c.LARGE_EQUIP_ID; INSERT INTO `default`.`tempo_blackhole_table` SELECT 1 ; org.apache.hadoop.hive.ql.parse.SemanticException: Line 2:178 Invalid table alias or column reference 'u': (possible column names are: a.paramvalue_id, a.platform_id, a.equipment_id, a.param_id, a.param_value, a.remark, a.create_time, a.creator, a.update_time, a.update_person, a.record_flag, a.subject_id, a.output_unit, a.show_seq, b.param_id, b.iadc_id, b.param_code, b.param_en, b.param_cn, b.output_standard, b.output_unit, b.param_type, b.param_value, b.remark, b.create_time, b.creator, b.update_time, b.update_person, b.record_flag, c.large_equip_id, c.equip_name, c.equip_type, c.equip_function, c.equip_board, c.ship_yard, c.manufacturer_date, c.enqueue_date, c.dockrepair_date, c.scrap_date, c.enqueue_mode, c.work_for_org, c.work_in_org, c.old_age, c.create_time, c.creator, c.update_time, c.update_person, c.record_flag, c.data_timestamp, c.work_unit_id, c.work_status, c.work_location, c.work_area, c.equip_code, c.shi_main_power, c.shi_total_len, c.shi_type_width, c.shi_type_depth, c.shi_design_draft, c.shi_total_tonnage, c.shi_load_tonnage, c.remark, c.unit_classification1, c.unit_classification2) ---- ??: "user-zh"
flink 1.13.1 ????hive??????????hive sql????????
Hi?? ??flink 1.13.1??hive sql?? CREATE CATALOG `tempo_df_hive_default_catalog` WITH( 'type' = 'hive', 'default-database' = 'default' ); USE CATALOG tempo_df_hive_default_catalog; CREATE TABLE IF NOT EXISTS `default`.`tempo_blackhole_table` ( f0 INT ); use cosldatacenter; INSERT INTO `dw_riginfoparam` ( `large_equip_id`, `equip_code`, `equip_name`, `enqueue_date`, `shi_total_len`, `shi_type_width`, `shi_type_depth`, `moonpool` ) SELECT mle.`large_equip_id` , mle.`equip_code` , mle.`equip_name` , mle.`enqueue_date` , mle.`shi_total_len` , mle.`shi_type_width` , mle.`shi_type_depth`, CASE WHEN mipd.`param_cn` = '' THEN mipv.`param_value` END AS `Moonpool` from `ods_emp_maindata_iadc_paramvalue` mipv INNER JOIN `ods_emp_maindata_iadc_paramdef` mipd ON mipv.`param_id` = mipd.`param_id` inner JOIN `ods_emp_md_large_equip` mle ON mipv.`SUBJECT_ID`= mle.`LARGE_EQUIP_ID`; INSERT INTO `default`.`tempo_blackhole_table` SELECT 1 ; ?? Caused by: org.apache.hadoop.hive.ql.parse.SemanticException: Line 2:195 Invalid table alias or column reference 'u': (possible column names are: mipv.paramvalue_id, mipv.platform_id, mipv.equipment_id, mipv.param_id, mipv.param_value, mipv.remark, mipv.create_time, mipv.creator, mipv.update_time, mipv.update_person, mipv.record_flag, mipv.subject_id, mipv.output_unit, mipv.show_seq, mipd.param_id, mipd.iadc_id, mipd.param_code, mipd.param_en, mipd.param_cn, mipd.output_standard, mipd.output_unit, mipd.param_type, mipd.param_value, mipd.remark, mipd.create_time, mipd.creator, mipd.update_time, mipd.update_person, mipd.record_flag, mle.large_equip_id, mle.equip_name, mle.equip_type, mle.equip_function, mle.equip_board, mle.ship_yard, mle.manufacturer_date, mle.enqueue_date, mle.dockrepair_date, mle.scrap_date, mle.enqueue_mode, mle.work_for_org, mle.work_in_org, mle.old_age, mle.create_time, mle.creator, mle.update_time, mle.update_person, mle.record_flag, mle.data_timestamp, mle.work_unit_id, mle.work_status, mle.work_location, mle.work_area, mle.equip_code, mle.shi_main_power, mle.shi_total_len, mle.shi_type_width, mle.shi_type_depth, mle.shi_design_draft, mle.shi_total_tonnage, mle.shi_load_tonnage, mle.remark, mle.unit_classification1, mle.unit_classification2) at org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.genAllExprNodeDesc(HiveParserSemanticAnalyzer.java:2467) ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.genExprNodeDesc(HiveParserSemanticAnalyzer.java:2421) ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner.genSelectLogicalPlan(HiveParserCalcitePlanner.java:2314) ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner.genLogicalPlan(HiveParserCalcitePlanner.java:2772) ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner.logicalPlan(HiveParserCalcitePlanner.java:285) ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner.genLogicalPlan(HiveParserCalcitePlanner.java:273) ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.delegation.hive.HiveParser.analyzeSql(HiveParser.java:326) ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.delegation.hive.HiveParser.processCmd(HiveParser.java:274) ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:217) ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1] at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:51) ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
?????? flink 1.13.1 sql hive is_generic = false ??????????????????????
??flink 1.12.0?? ---- ??: "user-zh"
flink 1.13.1 sql hive is_generic = false ??????????????????????
hi! ??flink 1.13.1??sqlhive ??hive 2.3.6??flink-sql-connector-hive-2.3.6 package com.meritdata.cloud.flink.test; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; public class Test { public static void main(String[] args) { EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tableEnvironment = TableEnvironment.create(bbSettings); tableEnvironment.executeSql("create catalog myhive with (" + " 'type' = 'hive', " + " 'default-database' = 'default', " + ")"); tableEnvironment.executeSql("use catalog myhive"); tableEnvironment.executeSql("create table if not exists q1 " + "( id string ) " + "with ('is_generic' = 'false')"); /** * hive, * desc formatted q1; * * col_name data_type comment * * * Table Parameters: * flink.is_generic false * flink.schema.0.data-type VARCHAR(2147483647) * flink.schema.0.name id * transient_lastDdTime 1627279802 * */ } }
flink 1.13.1 ????????row(a, b)??????????????
hi?? 1. flink 1.13.1 ??row(a,b)bug?? 2. rowrowrowname??name ?? package test; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.ResolvedSchema; public class DataGenTest { public static void main(String[] args) { StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment); tableEnvironment.executeSql("CREATE TABLE datagen (\n" + " f_sequence INT,\n" + " f_random INT,\n" + " f_random_str STRING,\n" + " ts AS localtimestamp,\n" + " WATERMARK FOR ts AS ts\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='5',\n" + " 'fields.f_sequence.kind'='sequence',\n" + " 'fields.f_sequence.start'='1',\n" + " 'fields.f_sequence.end'='1000',\n" + " 'fields.f_random.min'='1',\n" + " 'fields.f_random.max'='1000',\n" + " 'fields.f_random_str.length'='10'\n" + ")"); Table table = tableEnvironment.sqlQuery("select row(f_sequence, f_random) as c from datagen"); ResolvedSchema resolvedSchema = table.getResolvedSchema(); System.out.println(resolvedSchema); /** * ?? * ( * `c` ROW<`EXPR$0` INT, `EXPR$1` INT NOT NULL * ) * rowrowrow??c1, c2?? * ( * `c` ROW<`c1` INT, `c2` INT NOT NULL * ) */ Table table1 = tableEnvironment.sqlQuery("select * from " + table); /** * sql?? * Exception in thread "main" java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes: * validated type: * RecordType(RecordType:peek_no_expand(INTEGER EXPR$0, INTEGER EXPR$1) NOT NULL c) NOT NULL * converted type: * RecordType(RecordType(INTEGER EXPR$0, INTEGER EXPR$1) NOT NULL c) NOT NULL * rel: * LogicalProject(c=[ROW($0, $1)]) * LogicalWatermarkAssigner(rowtime=[ts], watermark=[$3]) * LogicalProject(f_sequence=[$0], f_random=[$1], f_random_str=[$2], ts=[LOCALTIMESTAMP]) * LogicalTableScan(table=[[default_catalog, default_database, datagen]]) */ ResolvedSchema resolvedSchema1 = table1.getResolvedSchema(); System.out.println(resolvedSchema1); table.execute().print(); } }
flink 1.13.1 org.apache.flink.table.catalog.Column ????????????????Serializable????
hi?? org.apache.flink.table.catalog.Column Serializable??Serializable??
??????Flink SQL ????????????DynamoDB
https://flink-packages.org/packages/streaming-flink-dynamodb-connector ---- ??: "user-zh"
flink 1.13.0 ??????flink sql ??????????????????????????????????schema.name
hi! flink jdbc ?? table-name?? CREATE TABLE MyUserTable ( id BIGINT, name STRING, age INT, status BOOLEAN, PRIMARY KEY (id) NOT ENFORCED ) WITH ('connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydatabase','table-name' = 'org.users' );
Flink CEP????????????????flink??????????????cep????
hi?? flink cep??cep
??????????????????flink ????????????????????????kafka,mysql??
?? ??flink ??
flink 1.12.2??????????????????????????????????????????????????
????flink 1.12??flinkDataStream API,??RuntimeExecutionMode.BATCH?? package com.meritdata.cloud.tempo.dw.flink.test.bug; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class JDBCTest { public static void main(String[] args) { test(); /** * ?? * EnvironmentSettings bbSettings = EnvironmentSettings.newInstance() * .useBlinkPlanner().inBatchMode().build(); * TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings); * ++--+ * | a | EXPR$1 | * ++--+ * | 2 |1 | * | 3 |2 | * | 1 |2 | * | 4 |1 | * ++--+ */ //test1(); /** * ??API * StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); * streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH); * StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv); * +++--+ * | op | a | EXPR$1 | * +++--+ * | +I | 2 |1 | * | +I | 1 |1 | * | +I | 4 |1 | * | -U | 1 |1 | * | +U | 1 |2 | * | +I | 3 |1 | * | -U | 3 |1 | * | +U | 3 |2 | * +++--+ */ } public static void test() { EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings); bbTableEnv.executeSql("CREATE TABLE ab (" + " a STRING, " + " b INT " + ") WITH (" + " 'connector' = 'jdbc'," + " 'url' = 'jdbc:mysql://localhost:3306/a?serverTimezone=UTC'," + " 'username' = 'root'," + " 'password' = 'root'," + " 'table-name' = 'ab'" + " )"); bbTableEnv.sqlQuery("select a, count(b) from ab group by a").execute().print(); } public static void test1() { StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv); tableEnv.executeSql("CREATE TABLE ab (" + " a STRING, " + " b INT " + ") WITH (" + " 'connector' = 'jdbc'," + " 'url' = 'jdbc:mysql://localhost:3306/a?serverTimezone=UTC'," + " 'username' = 'root'," + " 'password' = 'root'," + " 'table-name' = 'ab'" + " )"); tableEnv.sqlQuery("select a, count(b) from ab group by a").execute().print(); } }
flink 1.12.2??????DataStream????Table??????rowtime????????????????
?? ?? StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv); DataStream
FlinkUserCodeClassLoader??session??????????????????????????
?? ??flink-1.12.0,??flinkmain??URLClassLoader??http://a.jar??jarrest api jar/run??uber-jar, jobuber-jar
Table proram cannot be compiled
?? flink 1.12.0??org.apach.flink.table.runtime.generated.CompileUtils.compile()DataSteamTable program cannot be compiled. This is a bug. Please file an issue.?? public abstract java.lang.Object org.apache.flink.api.java.functions.KeySelector.getKey(java.lang.Object) ??
??????flink 1.11.2 rowtime??proctime?? Interval Join ????????????
?? ??join ---- ??: "user-zh" <18868816...@163.com; :2020??11??25??(??) 7:31 ??:"user-zh"
flink 1.11.2 rowtime??proctime?? Interval Join ????????????
?? Interval Join??flink 1.11.2joinjoin?? l_table.l_rt = r_table.r_pt l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND??package join; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class Test1 { public static void main(String[] args) { StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); String lTable = "CREATE TABLE l_table ( " + " l_a INT, " + " l_b string, " + " l_rt AS localtimestamp, " + " WATERMARK FOR l_rt AS l_rt " + ") WITH ( " + " 'connector' = 'datagen', " + " 'rows-per-second'='5', " + " 'fields.l_a.min'='1', " + " 'fields.l_a.max'='5', " + " 'fields.l_b.length'='5' " + ")"; bsTableEnv.executeSql(lTable); String rTable = "CREATE TABLE r_table ( " + " r_a INT, " + " r_b string, " + " r_pt AS proctime() " + ") WITH ( " + " 'connector' = 'datagen', " + " 'rows-per-second'='5', " + " 'fields.r_a.min'='1', " + " 'fields.r_a.max'='5', " + " 'fields.r_b.length'='5' " + ")"; bsTableEnv.executeSql(rTable); String printTable = "CREATE TABLE print (" + " l_a INT, " + " l_b string, " + " l_rt timestamp(3), " + " r_a INT, " + " r_b string, " + " r_pt timestamp(3) " + ") WITH ( " + " 'connector' = 'print' " + ") "; bsTableEnv.executeSql(printTable); // //Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt = r_table.r_pt"); // ??Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before. Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND"); bsTableEnv.executeSql("insert into print select * from " + joinTable); } }
??????flink 1.11.2 ????????????
??format=json??flinkjacksonjackson?? ---- ??: "Asahi Lee" <978466...@qq.com; :2020??11??14??(??) 4:58 ??:"user-zh"
flink 1.11.2 ????????????
?? ??flink sql 1.11.2??proctime()UTC??+08env.java.optsjvm??+08 json
?????? flink 1.11.2 ????????blink????????BatchTableEnvironment????
BatchTableEnvironmenttable to dataset; dataset to table ---- ??: "user-zh"
?????? flink 1.11.2 ????????blink????????BatchTableEnvironment????
??BatchTableEnvironment ---- ??: "user-zh"
flink 1.11.2 ????????blink????????BatchTableEnvironment????
?? ??flink 1.11.2??blink??batch // ** // BLINK BATCH QUERY // ** import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);??blink??batch??BatchTableEnvironment??
??????1.11.1 ??OutOfMemoryError: Metaspace. ????
?? ??flink 1.11.2MySQL /* Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved. The MySQL Connector/J is licensed under the terms of the GPLv2 <http://www.gnu.org/licenses/old-licenses/gpl-2.0.html;, like most MySQL Connectors. There are special exceptions to the terms and conditions of the GPLv2 as it is applied to this software, see the FOSS License Exception <http://www.mysql.com/about/legal/licensing/foss-exception.html;. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ package com.mysql.cj.jdbc; import java.lang.ref.Reference; import com.mysql.cj.jdbc.NonRegisteringDriver.ConnectionPhantomReference; public class AbandonedConnectionCleanupThread extends Thread { private static boolean running = true; private static Thread threadRef = null; public AbandonedConnectionCleanupThread() { super("Abandoned connection cleanup thread"); } @Override public void run() { threadRef = this; while (running) { try { Referencehttps://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#taskmanager-memory-jvm-metaspace-size [2] https://issues.apache.org/jira/browse/FLINK-16681 Best, Hailong Wang ?? 2020-11-04 19:08:37??"Asahi Lee" <978466...@qq.com ?? ?? nbsp; nbsp; nbsp; ??flink sql,JDBC??mysql??OutOfMemoryError: Metaspace.mysqlconnection??class??
1.11.1 ??OutOfMemoryError: Metaspace. ????
?? ??flink sql,JDBC??mysql??OutOfMemoryError: Metaspace.mysqlconnection??class??
flink 1.11.1 web ui ????????source??????detail??????recoreds sent????????????????0
?? flink??web uisource??detailRecords Sent??0??
?????? flink 1.11.2 Table sql??????????????????2????????????????????
?? flink SQL??rebalance---- ??:zilongnbsp;xiao
flink 1.11.2 Table sql??????????????????2????????????????????
?? ??flink sqlkafkasql??kafka1??2kafka stream apisql api??sql
?????? 1.11????????????????????????????????????????????????????????????????
?? StreamTableEnvironment.from("")??datagen??table?? ?? package org.apache.flink.playgrounds.spendreport; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import java.util.Arrays; public class Test2 { public static void main(String[] args) { StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); String sourceDDL = "CREATE TABLE datagen ( " + " f_random INT," + " f_random_str STRING " + " ) WITH ( " + " 'connector' = 'datagen', " + " 'rows-per-second'='10'," + " 'fields.f_random.min'='1', " + " 'fields.f_random.max'='10', " + " 'fields.f_random_str.length'='10' " + " ) "; bsTableEnv.executeSql(sourceDDL); for (int i = 0; i < 10; i++) { Table datagen = bsTableEnv.from("datagen"); System.out.println(datagen); } System.out.println("-" + Arrays.toString(bsTableEnv.listTables())); } } UnnamedTable$0 UnnamedTable$1 UnnamedTable$2 UnnamedTable$3 UnnamedTable$4 UnnamedTable$5 UnnamedTable$6 UnnamedTable$7 UnnamedTable$8 UnnamedTable$9 -[UnnamedTable$0, UnnamedTable$1, UnnamedTable$2, UnnamedTable$3, UnnamedTable$4, UnnamedTable$5, UnnamedTable$6, UnnamedTable$7, UnnamedTable$8, UnnamedTable$9, datagen]
1.11????????????????????????????????????????????????????????????????
?? ??StreamTableEnvironment.from("") ??package kafka; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class Test3 { public static void main(String[] args) { // StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); // ?? String inTablePath = "CREATE TABLE datagen ( " + " id INT, " + " total string, " + " ts AS localtimestamp, " + " WATERMARK FOR ts AS ts " + ") WITH ( " + " 'connector' = 'datagen', " + " 'rows-per-second'='5', " + " 'fields.id.min'='1', " + " 'fields.id.max'='10', " + " 'fields.total.length'='10' " + ")"; // ?? bsTableEnv.executeSql(inTablePath); Table table = bsTableEnv.sqlQuery("select id, total, 12 as col_1 from datagen"); bsTableEnv.createTemporaryView("table1", table); Table table1 = bsTableEnv.from("table1"); System.out.println(table1); // ??table1 Table queryT = bsTableEnv.sqlQuery("select table1.id, 1 as b from table1"); System.out.println(queryT.getSchema()); bsTableEnv.sqlQuery("select table1.id from " + bsTableEnv.from("table1")); } }
?????? 1.11??????????TableEnvironment.executeSql("insert into ...")??job??????????????
??sqljob?? ---- ??: "user-zh"
1.11??????????TableEnvironment.executeSql("insert into ...")??job??????????????
?? insert into??job ?? EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build(); TableEnvironment bsTableEnv = TableEnvironment.create(bbSettings); String sourceDDL = "CREATE TABLE datagen ( " + " f_random INT, " + " f_random_str STRING, " + " ts AS localtimestamp, " + " WATERMARK FOR ts AS ts " + ") WITH ( " + " 'connector' = 'datagen', " + " 'rows-per-second'='10', " + " 'fields.f_random.min'='1', " + " 'fields.f_random.max'='5', " + " 'fields.f_random_str.length'='10' " + ")"; bsTableEnv.executeSql(sourceDDL); Table datagen = bsTableEnv.from("datagen"); System.out.println(datagen.getSchema()); String sinkDDL = "CREATE TABLE print_table (" + " f_random int," + " c_val bigint, " + " wStart TIMESTAMP(3) " + ") WITH ('connector' = 'print') "; bsTableEnv.executeSql(sinkDDL); System.out.println(bsTableEnv.from("print_table").getSchema()); Table table = bsTableEnv.sqlQuery("select f_random, count(f_random_str), TUMBLE_START(ts, INTERVAL '5' second) as wStart from datagen group by TUMBLE(ts, INTERVAL '5' second), f_random"); bsTableEnv.executeSql("insert into print_table select * from " + table);
1.11????????????????????Cannot have more than one execute() or executeAsyc() call in a single environment
hello! flink??iedaCannot have more than one execute() or executeAsyc() call in a single environmentjob manager?? org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution()??139
????datagen connector??????????????????????????????????????????????????????????
1. package kafka; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class DataGenTest { public static void main(String[] args) { StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); String sourceTableDDL = "CREATE TABLE datagen ( " + " f_random INT, " + " f_random_str STRING, " + " ts AS localtimestamp, " + " WATERMARK FOR ts AS ts " + ") WITH ( " + " 'connector' = 'datagen', " + " 'rows-per-second'='20', " + " 'fields.f_random.min'='1', " + " 'fields.f_random.max'='10', " + " 'fields.f_random_str.length'='10' " + ")"; bsTableEnv.executeSql(sourceTableDDL); bsTableEnv.executeSql("SELECT f_random, count(1) " + "FROM datagen " + "GROUP BY TUMBLE(ts, INTERVAL '1' second), f_random").print(); } }2. log4j:WARN No appenders could be found for logger (org.apache.flink.table.module.ModuleManager). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. +-+--+ |f_random | EXPR$1 | +-+--+
?????? flink 1.11 ddl sql ????PROCTIME()????????csv????
filesystemcsv?? filesystem ---- ??: "user-zh" https://issues.apache.org/jira/browse/FLINK-18665 <https://issues.apache.org/jira/browse/FLINK-18665; ?? 2020??7??23????00:07??Asahi Lee <978466...@qq.com ?? 1. StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); nbsp; nbsp; nbsp; nbsp; EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); nbsp; nbsp; nbsp; nbsp; StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); nbsp; nbsp; nbsp; nbsp; String sourceTableDDL = "CREATE TABLE fs_table (" + nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; "nbsp; user_id STRING," + nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; "nbsp; order_amount DOUBLE," + nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; "nbsp; dt TIMESTAMP(3)," + nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; "nbsp; pt AS PROCTIME() " + nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; " ) WITH (" + nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; "nbsp; 'connector'='filesystem'," + nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; "nbsp; 'path'='D:\\Program Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv'," + nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; "nbsp; 'format'='csv'" + nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; " )"; nbsp; nbsp; nbsp; nbsp; bsTableEnv.executeSql(sourceTableDDL); nbsp; nbsp; nbsp; nbsp; bsTableEnv.executeSql("select * from fs_table").print(); 2. csv order.csv zhangsan,12.34,2020-08-03 12:23:50 lisi,234.67,2020-08-03 12:25:50 wangwu,57.6,2020-08-03 12:25:50 zhaoliu,345,2020-08-03 12:28:50 3. nbsp;- Source: FileSystemTableSource(user_id, order_amount, dt, pt) -gt; Calc(select=[user_id, order_amount, dt, PROCTIME_MATERIALIZE(()) AS pt]) -gt; SinkConversionToRow (4/6) (9ee0383d676a190b0a62d206039db26c) switched from RUNNING to FAILED. java.io.IOException: Failed to deserialize CSV row. at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299) at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210) at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) Caused by: java.lang.RuntimeException: Row length mismatch. 4 fields expected but was 3. at org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441) at org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244) at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293) ... 5 more
flink 1.11 ddl sql ????PROCTIME()????????csv????
1. StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); String sourceTableDDL = "CREATE TABLE fs_table (" + " user_id STRING," + " order_amount DOUBLE," + " dt TIMESTAMP(3)," + " pt AS PROCTIME() " + " ) WITH (" + " 'connector'='filesystem'," + " 'path'='D:\\Program Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv'," + " 'format'='csv'" + " )"; bsTableEnv.executeSql(sourceTableDDL); bsTableEnv.executeSql("select * from fs_table").print(); 2. csv order.csv zhangsan,12.34,2020-08-03 12:23:50 lisi,234.67,2020-08-03 12:25:50 wangwu,57.6,2020-08-03 12:25:50 zhaoliu,345,2020-08-03 12:28:50 3. - Source: FileSystemTableSource(user_id, order_amount, dt, pt) - Calc(select=[user_id, order_amount, dt, PROCTIME_MATERIALIZE(()) AS pt]) - SinkConversionToRow (4/6) (9ee0383d676a190b0a62d206039db26c) switched from RUNNING to FAILED. java.io.IOException: Failed to deserialize CSV row. at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299) at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210) at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) Caused by: java.lang.RuntimeException: Row length mismatch. 4 fields expected but was 3. at org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441) at org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244) at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293) ... 5 more