????????????????????null??????????????????????????????

2022-09-13 Thread Asahi Lee
??null




----
??: 
   "user-zh"



??????????????null??????????????????????????????

2022-09-08 Thread Asahi Lee
2022-09-09 11:36:42,866 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   
[] - Source: HiveSource-ods_jt_hrs.ods_hrmis_HR_EMPL_Education (1/1) 
(2a68412dab3602a1eeda5a750b308e23) switched from RUNNING to FAILED on 
container_1658144991761_106260_01_02 @ hhny-cdh05 (dataPort=45015).
java.lang.RuntimeException: null
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:97)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:91)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:45)
 ~[flink-table_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:35)
 ~[flink-table_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
 ~[flink-table_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]
Caused by: java.lang.NullPointerException
at 
org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:118) 
~[flink-table_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:204)
 ~[flink-table_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103)
 ~[flink-table_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48)
 ~[flink-table_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:168)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:132)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:106)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 

Hash join exceeded maximum number of recursions, without reducing partitions enough to be memory resident. Probably cause: Too many duplicate keys.

2022-03-27 Thread Asahi Lee

  ??Flink 1.13.2 
HiveCatalogHive


Caused by: java.lang.RuntimeException: Hash join exceeded maximum number of 
recursions, without reducing partitions enough to be memory resident. Probably 
cause: Too many duplicate keys.
at 
org.apache.flink.table.runtime.hashtable.BinaryHashTable.buildTableFromSpilledPartition(BinaryHashTable.java:443)
 ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.table.runtime.hashtable.BinaryHashTable.prepareNextPartition(BinaryHashTable.java:403)
 ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.table.runtime.hashtable.BinaryHashTable.nextMatching(BinaryHashTable.java:265)
 ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.table.runtime.operators.join.HashJoinOperator.endInput(HashJoinOperator.java:176)
 ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapper.endOperatorInput(TableOperatorWrapper.java:124)
 ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperator.endInput(BatchMultipleInputStreamOperator.java:56)
 ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:93)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:428)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:87)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) 
~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) 
~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
~[flink-dist_2.11-1.13.2.jar:1.13.2]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]

flink 1.13.2 ????hive????????????????NullPointerException

2022-01-23 Thread Asahi Lee
2022-01-23 04:31:39,568 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   
[] - Source: HiveSource-cosldatacenter.ods_rimdrill_dailyincidentsevents 
- Calc(select=[jobid, reportno, dayssincelast], where=[(idno = 1:BIGINT)]) 
(1/1) (7533d77baa7eb16e8242ae63e0706dff) switched from RUNNING to CANCELING.
2022-01-23 04:31:39,570 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   
[] - Discarding the results produced by task execution 
07b2cd514c6b6d85f79ab5b953971f82.
2022-01-23 04:31:39,570 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   
[] - MultipleInput(readOrder=[0,0,1], 
members=[\nHashJoin(joinType=[LeftOuterJoin], where=[((jobid = jobid0) AND 
($f109 = reportno0))], select=[jobid, reportno, reportdate, depthprogress, 
numperstype1, numperstype2, numperstype3, numperstype4, numperstype5, 
numperstype6, currentops, futureops, dayssincelast, $f109, jobid0, reportno0, 
boptestdate, bopfunctiontestdate], build=[right])\n:- Calc(select=[jobid, 
reportno, reportdate, depthprogress, numperstype1, numperstype2, numperstype3, 
numperstype4, numperstype5, numperstype6, currentops, futureops, dayssincelast, 
bigint(reportno) AS $f109])\n: +- HashJoin(joinType=[LeftOuterJoin], 
where=[((jobid = jobid0) AND ($f94 = reportno0))], select=[jobid, reportno, 
reportdate, depthprogress, numperstype1, numperstype2, numperstype3, 
numperstype4, numperstype5, numperstype6, currentops, futureops, $f94, jobid0, 
reportno0, dayssincelast], build=[right])\n:  :- 
Calc(select=[jobid, reportno, reportdate, depthprogress, numperstype1, 
numperstype2, numperstype3, numperstype4, numperstype5, numperstype6, 
currentops, futureops, bigint(reportno) AS $f94])\n:  : 
+- [#3] Exchange(distribution=[hash[jobid]])\n:  +- [#2] 
Exchange(distribution=[hash[jobid]])\n+- [#1] 
Exchange(distribution=[hash[jobid]])\n]) - Calc(select=[jobid AS $f0, 
reportno AS $f1, dayssincelast AS $f2, depthprogress AS $f3, currentops AS $f4, 
futureops AS $f5, reportdate AS $f6, numperstype1 AS $f7, numperstype2 AS $f8, 
numperstype3 AS $f9, numperstype4 AS $f10, numperstype5 AS $f11, numperstype6 
AS $f12, boptestdate AS $f13, bopfunctiontestdate AS $f14]) - 
HashAggregate(isMerge=[false], groupBy=[$f0, $f1, $f2, $f3, $f4, $f5, $f6, $f7, 
$f8, $f9, $f10, $f11, $f12], select=[$f0, $f1, $f2, $f3, $f4, $f5, $f6, $f7, 
$f8, $f9, $f10, $f11, $f12, MAX($f13) AS lasboptestdate, MAX($f14) AS 
lasbopfunctiontestdate]) - Calc(select=[$f0 AS jobid, $f1 AS reportno, 
string($f6) AS reportdate, bigint((nvl($f7, 0) + nvl($f8, 0)) + nvl($f9, 
0)) + nvl($f10, 0)) + nvl($f11, 0)) + nvl($f12, 0))) AS pobcnt, $f2 AS 
dayssincelast, $f3 AS depthprogress, $f4 AS currentops, $f5 AS futureops, 
lasboptestdate, lasbopfunctiontestdate]) - Map - Sink: Unnamed (1/1) 
(3c555cbd6bf411a6111cf7eaab527d33) switched from CREATED to CANCELING.
2022-01-23 04:31:39,570 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   
[] - MultipleInput(readOrder=[0,0,1], 
members=[\nHashJoin(joinType=[LeftOuterJoin], where=[((jobid = jobid0) AND 
($f109 = reportno0))], select=[jobid, reportno, reportdate, depthprogress, 
numperstype1, numperstype2, numperstype3, numperstype4, numperstype5, 
numperstype6, currentops, futureops, dayssincelast, $f109, jobid0, reportno0, 
boptestdate, bopfunctiontestdate], build=[right])\n:- Calc(select=[jobid, 
reportno, reportdate, depthprogress, numperstype1, numperstype2, numperstype3, 
numperstype4, numperstype5, numperstype6, currentops, futureops, dayssincelast, 
bigint(reportno) AS $f109])\n: +- HashJoin(joinType=[LeftOuterJoin], 
where=[((jobid = jobid0) AND ($f94 = reportno0))], select=[jobid, reportno, 
reportdate, depthprogress, numperstype1, numperstype2, numperstype3, 
numperstype4, numperstype5, numperstype6, currentops, futureops, $f94, jobid0, 
reportno0, dayssincelast], build=[right])\n:  :- 
Calc(select=[jobid, reportno, reportdate, depthprogress, numperstype1, 
numperstype2, numperstype3, numperstype4, numperstype5, numperstype6, 
currentops, futureops, bigint(reportno) AS $f94])\n:  : 
+- [#3] Exchange(distribution=[hash[jobid]])\n:  +- [#2] 
Exchange(distribution=[hash[jobid]])\n+- [#1] 
Exchange(distribution=[hash[jobid]])\n]) - Calc(select=[jobid AS $f0, 
reportno AS $f1, dayssincelast AS $f2, depthprogress AS $f3, currentops AS $f4, 
futureops AS $f5, reportdate AS $f6, numperstype1 AS $f7, numperstype2 AS $f8, 
numperstype3 AS $f9, numperstype4 AS $f10, numperstype5 AS $f11, numperstype6 
AS $f12, boptestdate AS $f13, bopfunctiontestdate AS $f14]) - 
HashAggregate(isMerge=[false], groupBy=[$f0, $f1, $f2, $f3, $f4, $f5, $f6, $f7, 
$f8, $f9, $f10, $f11, $f12], select=[$f0, $f1, $f2, $f3, $f4, $f5, $f6, $f7, 
$f8, $f9, $f10, $f11, $f12, MAX($f13) AS lasboptestdate, MAX($f14) AS 
lasbopfunctiontestdate]) - Calc(select=[$f0 AS jobid, $f1 AS reportno, 
string($f6) AS reportdate, bigint((nvl($f7, 0) + nvl($f8, 0)) + nvl($f9, 
0)) + nvl($f10, 0)) + nvl($f11, 0)) + nvl($f12, 0))) AS pobcnt, $f2 AS 

?????? ??????????????downloads/setup-pyflink-virtual-env.sh????

2021-11-22 Thread Asahi Lee
??venv.zip??1.14.0??




----
??: 
   "user-zh"

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/python_config/#python-client-executable

 
??1.14.0??-pyclientexec 
venv.zip/venv/bin/python


 On Fri, Nov 19, 2021 at 10:48 AM Asahi Lee 
<978466...@qq.com.invalidgt;
 wrote:

 gt; ??source
 
my_env/bin/activate??PYFLINK_CLIENT_EXECUTABLE??
 gt; jobmanagerNo module named 
pyflinkjobmanageryarn
 gt; ??
 gt;
 gt;
 gt; amp;gt; LogType:jobmanager.out
 gt; amp;gt; Log Upload Time:?? ?? 18 20:48:45 +0800 2021
 gt; amp;gt; LogLength:37
 gt; amp;gt; Log Contents:
 gt; amp;gt; /bin/python: No module named pyflink
 gt;
 gt;
 gt;
 gt;
 gt; 
--amp;nbsp;amp;nbsp;--
 gt; ??:
 
gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 "user-zh"
 
gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 <
 gt; dian0511...@gmail.comamp;gt;;
 gt; :amp;nbsp;2021??11??19??(??) 9:38
 gt; 
??:amp;nbsp;"user-zh"https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#python-interpreter-of-client
 gt
 
<https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#python-interpreter-of-clientgt;
 ;
 gt; On Thu, Nov 18, 2021 at 9:00 PM Asahi Lee 
<978466...@qq.com.invalid
 amp;gt;
 gt; wrote:
 gt;
 gt; amp;gt; Hi !
 gt; amp;gt; amp;amp;nbsp; amp;amp;nbsp;java Table 
api??python udf
 gt; amp;gt; 
pythonjm??/bin/python:
 No module
 gt; named
 gt; amp;gt; pyflink??
 gt; amp;gt;
 gt; amp;gt;
 gt; amp;gt; ./flink-1.13.2/bin/flinkamp;amp;nbsp;
 gt; amp;gt; run-application -t yarn-applicationamp;amp;nbsp;
 gt; amp;gt;
 gt;
 
-Dyarn.provided.lib.dirs="hdfs://nameservice1/user/flink/flinklib"amp;amp;nbsp;
 gt; amp;gt; -Dyarn.application.queue=d
 gt; amp;gt; -p 1amp;amp;nbsp;
 gt; amp;gt; -pyarch /opt/venv.zip
 gt; amp;gt; -pyexec venv.zip/venv/bin/pythonamp;amp;nbsp;
 gt; amp;gt; -pyfs /opt/test.pyamp;amp;nbsp;
 gt; amp;gt; -c test.PyUDFTestamp;amp;nbsp;
 gt; amp;gt; /opt/flink-python-test-1.0-SNAPSHOT.jar
 gt; amp;gt;
 gt; amp;gt;
 gt; amp;gt;
 gt; amp;gt; ??
 gt; amp;gt; Caused by: java.lang.RuntimeException: Python 
callback
 server start
 gt; failed!
 gt; amp;gt; amp;amp;nbsp; amp;amp;nbsp; 
amp;amp;nbsp; amp;amp;nbsp;
 at
 gt; amp;gt;
 gt;
 
org.apache.flink.client.python.PythonFunctionFactory.createPythonFunctionFactory(PythonFunctionFactory.java:167)
 gt; amp;gt; ~[flink-python_2.11-1.13.2.jar:1.13.2]
 gt; amp;gt; amp;amp;nbsp; amp;amp;nbsp; 
amp;amp;nbsp; amp;amp;nbsp;
 at
 gt; amp;gt;
 gt;
 
org.apache.flink.client.python.PythonFunctionFactory$1.load(PythonFunctionFactory.java:88)
 gt; amp;gt; ~[flink-python_2.11-1.13.2.jar:1.13.2]
 gt; amp;gt; amp;amp;nbsp; amp;amp;nbsp; 
amp;amp;nbsp; amp;amp;nbsp;
 at
 gt; amp;gt;
 gt;
 
org.apache.flink.client.python.PythonFunctionFactory$1.load(PythonFunctionFactory.java:84)
 gt; amp;gt; ~[flink-python_2.11-1.13.2.jar:1.13.2]
 gt; amp;gt; amp;amp;nbsp; amp;amp;nbsp; 
amp;amp;nbsp; amp;amp;nbsp;
 at
 gt; amp;gt;
 gt;
 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
 gt; amp;gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1]
 gt; amp;gt; amp;amp;nbsp; amp;amp;nbsp; 
amp;amp;nbsp; amp;amp;nbsp;
 at
 gt; amp;gt;
 gt;
 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
 gt; amp;gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1]
 gt; amp;gt; amp;amp;nbsp; amp;amp;nbsp; 
amp;amp;nbsp; amp;amp;nbsp;
 at
 gt; amp;gt;
 gt;
 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
 gt; amp;gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1]
 gt; amp;gt; amp;amp;nbsp; amp;amp;nbsp; 
amp;amp;nbsp; amp;amp;nbsp;
 at
 gt; amp;gt;
 gt;
 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(

?????? ??????????????downloads/setup-pyflink-virtual-env.sh????

2021-11-21 Thread Asahi Lee
Hi!
 
??python??-D??-py
./flink-1.14.0/bin/flink
run-application -t yarn-application
-Dyarn.provided.lib.dirs="hdfs://nameservice1/user/flink/flinklib"
-Dyarn.application.queue=d
-Dpython.archives="/opt/venv.zip"
-Dpython.client.executable="venv.zip/venv/bin/python"
-Dpython.executable="venv.zip/venv/bin/python"
-Dpython.files="/opt/test.py"
-p 1
-c test.PyUDFTest
/opt/flink-python-test-1.0-SNAPSHOT.jar



??


taskmanager.log


2021-11-19 13:59:24,030 ERROR 
/yarn/nm/usercache/tempodata/appcache/application_1637293573159_0150/python-dist-6a5f3e17-4ecf-4782-b9ac-0e944d69/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:296
 [] - Error processing instruction 1. Original traceback is
Traceback (most recent call last):
 File 
"/yarn/nm/usercache/tempodata/appcache/application_1637293573159_0150/python-dist-6a5f3e17-4ecf-4782-b9ac-0e944d69/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 289, in _execute
  response = task()
 File 
"/yarn/nm/usercache/tempodata/appcache/application_1637293573159_0150/python-dist-6a5f3e17-4ecf-4782-b9ac-0e944d69/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 362, in https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/python_config/#python-client-executable

??1.14.0??-pyclientexec 
venv.zip/venv/bin/python


On Fri, Nov 19, 2021 at 10:48 AM Asahi Lee <978466...@qq.com.invalid wrote:

 ??source 
my_env/bin/activate??PYFLINK_CLIENT_EXECUTABLE??
 jobmanagerNo module named 
pyflinkjobmanageryarn
 ??


 gt; LogType:jobmanager.out
 gt; Log Upload Time:?? ?? 18 20:48:45 +0800 2021
 gt; LogLength:37
 gt; Log Contents:
 gt; /bin/python: No module named pyflink




 --nbsp;nbsp;--
 ??:

 "user-zh"

 <
 dian0511...@gmail.comgt;;
 :nbsp;2021??11??19??(??) 9:38
 ??:nbsp;"user-zh"https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#python-interpreter-of-client

 On Thu, Nov 18, 2021 at 9:00 PM Asahi Lee <978466...@qq.com.invalidgt;
 wrote:

 gt; Hi !
 gt; amp;nbsp; amp;nbsp;java Table api??python udf
 gt; 
pythonjm??/bin/python:
 No module
 named
 gt; pyflink??
 gt;
 gt;
 gt; ./flink-1.13.2/bin/flinkamp;nbsp;
 gt; run-application -t yarn-applicationamp;nbsp;
 gt;
 
-Dyarn.provided.lib.dirs="hdfs://nameservice1/user/flink/flinklib"amp;nbsp;
 gt; -Dyarn.application.queue=d
 gt; -p 1amp;nbsp;
 gt; -pyarch /opt/venv.zip
 gt; -pyexec venv.zip/venv/bin/pythonamp;nbsp;
 gt; -pyfs /opt/test.pyamp;nbsp;
 gt; -c test.PyUDFTestamp;nbsp;
 gt; /opt/flink-python-test-1.0-SNAPSHOT.jar
 gt;
 gt;
 gt;
 gt; ??
 gt; Caused by: java.lang.RuntimeException: Python callback server 
start
 failed!
 gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp; at
 gt;
 
org.apache.flink.client.python.PythonFunctionFactory.createPythonFunctionFactory(PythonFunctionFactory.java:167)
 gt; ~[flink-python_2.11-1.13.2.jar:1.13.2]
 gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp; at
 gt;
 
org.apache.flink.client.python.PythonFunctionFactory$1.load(PythonFunctionFactory.java:88)
 gt; ~[flink-python_2.11-1.13.2.jar:1.13.2]
 gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp; at
 gt;
 
org.apache.flink.client.python.PythonFunctionFactory$1.load(PythonFunctionFactory.java:84)
 gt; ~[flink-python_2.11-1.13.2.jar:1.13.2]
 gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp; at
 gt;
 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
 gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1]
 gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp; at
 gt;
 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
 gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1]
 gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp; at
 gt;
 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
 gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1]
 gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp; at
 gt;
 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
 gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1]
 gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp; at
 gt;
 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
 gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1]
 gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp; at
 gt;
 
org.apache.f

?????? ??????????????downloads/setup-pyflink-virtual-env.sh????

2021-11-18 Thread Asahi Lee
??source 
my_env/bin/activate??PYFLINK_CLIENT_EXECUTABLE??
jobmanagerNo module named 
pyflinkjobmanageryarn
??


 LogType:jobmanager.out
 Log Upload Time:?? ?? 18 20:48:45 +0800 2021
 LogLength:37
 Log Contents:
 /bin/python: No module named pyflink




----
??: 
   "user-zh"

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#python-interpreter-of-client

On Thu, Nov 18, 2021 at 9:00 PM Asahi Lee <978466...@qq.com.invalid wrote:

 Hi !
 nbsp; nbsp;java Table api??python udf
 
pythonjm??/bin/python:
 No module named
 pyflink??


 ./flink-1.13.2/bin/flinknbsp;
 run-application -t yarn-applicationnbsp;
 
-Dyarn.provided.lib.dirs="hdfs://nameservice1/user/flink/flinklib"nbsp;
 -Dyarn.application.queue=d
 -p 1nbsp;
 -pyarch /opt/venv.zip
 -pyexec venv.zip/venv/bin/pythonnbsp;
 -pyfs /opt/test.pynbsp;
 -c test.PyUDFTestnbsp;
 /opt/flink-python-test-1.0-SNAPSHOT.jar



 ??
 Caused by: java.lang.RuntimeException: Python callback server start failed!
 nbsp; nbsp; nbsp; nbsp; at
 
org.apache.flink.client.python.PythonFunctionFactory.createPythonFunctionFactory(PythonFunctionFactory.java:167)
 ~[flink-python_2.11-1.13.2.jar:1.13.2]
 nbsp; nbsp; nbsp; nbsp; at
 
org.apache.flink.client.python.PythonFunctionFactory$1.load(PythonFunctionFactory.java:88)
 ~[flink-python_2.11-1.13.2.jar:1.13.2]
 nbsp; nbsp; nbsp; nbsp; at
 
org.apache.flink.client.python.PythonFunctionFactory$1.load(PythonFunctionFactory.java:84)
 ~[flink-python_2.11-1.13.2.jar:1.13.2]
 nbsp; nbsp; nbsp; nbsp; at
 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
 nbsp; nbsp; nbsp; nbsp; at
 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
 nbsp; nbsp; nbsp; nbsp; at
 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
 nbsp; nbsp; nbsp; nbsp; at
 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
 nbsp; nbsp; nbsp; nbsp; at
 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
 nbsp; nbsp; nbsp; nbsp; at
 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3941)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
 nbsp; nbsp; nbsp; nbsp; at
 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4824)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
 nbsp; nbsp; nbsp; nbsp; at
 
org.apache.flink.client.python.PythonFunctionFactory.getPythonFunction(PythonFunctionFactory.java:129)
 ~[flink-python_2.11-1.13.2.jar:1.13.2]
 nbsp; nbsp; nbsp; nbsp; at
 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_111]
 nbsp; nbsp; nbsp; nbsp; at
 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 ~[?:1.8.0_111]
 nbsp; nbsp; nbsp; nbsp; at
 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_111]
 nbsp; nbsp; nbsp; nbsp; at
 java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_111]
 nbsp; nbsp; nbsp; nbsp; at
 
org.apache.flink.table.functions.python.utils.PythonFunctionUtils.getPythonFunction(PythonFunctionUtils.java:45)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
 nbsp; nbsp; nbsp; nbsp; at
 
org.apache.flink.table.functions.UserDefinedFunctionHelper.instantiateFunction(UserDefinedFunctionHelper.java:206)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
 nbsp; nbsp; nbsp; nbsp; at
 
org.apache.flink.table.catalog.FunctionCatalog.getFunctionDefinition(FunctionCatalog.java:659)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
 nbsp; nbsp; nbsp; nbsp; at
 
org.apache.flink.table.catalog.FunctionCatalog.resolveAmbiguousFunctionReference(FunctionCatalog.java:606)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
 nbsp; nbsp; nbsp; nbsp; at
 
org.apache.flink.table.catalog.FunctionCatalog.lookupFunction(FunctionCatalog.java:362)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
 nbsp; nbsp; nbsp; nbsp; at
 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:97)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]

????????????????????downloads/setup-pyflink-virtual-env.sh????

2021-11-18 Thread Asahi Lee
jar:1.13.1]
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) 
~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at test.PyUDFTest.main(PyUDFTest.java:22) 
~[flink-python-test-1.0-SNAPSHOT.jar:?]
at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_111]
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_111]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_111]
at java.lang.reflect.Method.invoke(Method.java:498) 
~[?:1.8.0_111]
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
... 10 more
2021-11-18 20:48:44,458 INFO 
org.apache.flink.runtime.blob.BlobServer 
 [] - Stopped BLOB server at 
0.0.0.0:45070
2021-11-18 20:48:44,458 INFO 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint   
 [] - Shutting YarnApplicationClusterEntryPoint down with application 
status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
2021-11-18 20:48:44,458 INFO 
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting 
down rest endpoint.
2021-11-18 20:48:44,474 INFO 
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing 
cache directory /tmp/flink-web-dd82c3c0-f457-492d-8e64-5ae74fe9abbd/flink-web-ui
2021-11-18 20:48:44,474 INFO 
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - 
http://cdh5node3:40216 lost leadership
2021-11-18 20:48:44,474 INFO 
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down 
complete.
2021-11-18 20:48:44,474 INFO 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent
 [] - Closing components.
2021-11-18 20:48:44,475 INFO 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - 
Stopping SessionDispatcherLeaderProcess.
2021-11-18 20:48:44,475 INFO 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  [] 
- Stopping dispatcher akka.tcp://flink@cdh5node3:34697/user/rpc/dispatcher_1.
2021-11-18 20:48:44,476 INFO 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  [] 
- Stopping all currently running jobs of dispatcher 
akka.tcp://flink@cdh5node3:34697/user/rpc/dispatcher_1.


LogType:jobmanager.out
Log Upload Time:?? ?? 18 20:48:45 +0800 2021
LogLength:37
Log Contents:
/bin/python: No module named pyflink













----
??: 
   "user-zh"

https://nightlies.apache.org/flink/flink-docs-release-1.12/downloads/setup-pyflink-virtual-env.sh
?? 2021-11-18 15:05:03??"Asahi Lee" <978466...@qq.com.INVALID ??
Hi!
nbsp; nbsp; 
flink??nbsp;nbsp;setup-pyflink-virtual-env.sh 
python??
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/python/faq/

??????????????downloads/setup-pyflink-virtual-env.sh????

2021-11-17 Thread Asahi Lee
Hi!
  
flink??setup-pyflink-virtual-env.sh 
python??
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/python/faq/

flink 1.13.2 ?? Java/Scala ?????????? Python UDF??????????yarn-application??????yarn????????????????????????pyflink?

2021-11-08 Thread Asahi Lee
HI!
  ??flink 1.13.2??java table 
apipython 
udf??yarn-applicationyarn??pyflink?

flink 1.13.1 ????yarn-application????????????????mysql??????????????hive??????????????16G+??Taskmangaer????

2021-11-04 Thread Asahi Lee
hi!
??flink 
sqlmysql??hive??yarn-application??16G??

flink 1.13.2 ????avg??????int????????????????????????int??????????????????

2021-09-27 Thread Asahi Lee
hi!   ??flink 1.13.2?? int 
??avg?? int  
double??decimal??bug

?????? flink 1.13.1 ????hive????????????

2021-09-06 Thread Asahi Lee

2021-09-06 11:20:32,787 WARN 
org.apache.hadoop.hive.metastore.MetaStoreDirectSql: [pool-4-thread-192]: 
Failed to execute [select "COLUMN_NAME", "COLUMN_TYPE", "LONG_LOW_VALUE", 
"LONG_HIGH_VALUE", "DOUBLE_LOW_VALUE", "DOUBLE_HIGH_VALUE", 
"BIG_DECIMAL_LOW_VALUE", "BIG_DECIMAL_HIGH_VALUE", "NUM_NULLS", 
"NUM_DISTINCTS", "AVG_COL_LEN", "MAX_COL_LEN", "NUM_TRUES", "NUM_FALSES", 
"LAST_ANALYZED" from "TAB_COL_STATS" where "DB_NAME" = ? and 
"TABLE_NAME" = ? and "COLUMN_NAME" in (?,?)] with parameters [cosldatacenter, 
dw_zyxx_test, a, ??]
javax.jdo.JDODataStoreException: Error executing SQL query "select 
"COLUMN_NAME", "COLUMN_TYPE", "LONG_LOW_VALUE", "LONG_HIGH_VALUE", 
"DOUBLE_LOW_VALUE", "DOUBLE_HIGH_VALUE", "BIG_DECIMAL_LOW_VALUE", 
"BIG_DECIMAL_HIGH_VALUE", "NUM_NULLS", "NUM_DISTINCTS", "AVG_COL_LEN", 
"MAX_COL_LEN", "NUM_TRUES", "NUM_FALSES", "LAST_ANALYZED" from 
"TAB_COL_STATS" where "DB_NAME" = ? and "TABLE_NAME" = ? and 
"COLUMN_NAME" in (?,?)".
at 
org.datanucleus.api.jdo.NucleusJDOHelper.getJDOExceptionForNucleusException(NucleusJDOHelper.java:451)
at org.datanucleus.api.jdo.JDOQuery.executeWithArray(JDOQuery.java:321)
at 
org.apache.hadoop.hive.metastore.MetaStoreDirectSql.executeWithArray(MetaStoreDirectSql.java:1611)
at 
org.apache.hadoop.hive.metastore.MetaStoreDirectSql.access$1300(MetaStoreDirectSql.java:83)
at 
org.apache.hadoop.hive.metastore.MetaStoreDirectSql$13.run(MetaStoreDirectSql.java:1173)
at 
org.apache.hadoop.hive.metastore.MetaStoreDirectSql.runBatched(MetaStoreDirectSql.java:1656)
at 
org.apache.hadoop.hive.metastore.MetaStoreDirectSql.getTableStats(MetaStoreDirectSql.java:1183)
at 
org.apache.hadoop.hive.metastore.ObjectStore$9.getSqlResult(ObjectStore.java:6590)
at 
org.apache.hadoop.hive.metastore.ObjectStore$9.getSqlResult(ObjectStore.java:6587)
at 
org.apache.hadoop.hive.metastore.ObjectStore$GetHelper.run(ObjectStore.java:2616)
at 
org.apache.hadoop.hive.metastore.ObjectStore.getTableColumnStatisticsInternal(ObjectStore.java:6586)
at 
org.apache.hadoop.hive.metastore.ObjectStore.getTableColumnStatistics(ObjectStore.java:6580)
at sun.reflect.GeneratedMethodAccessor74.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.hive.metastore.RawStoreProxy.invoke(RawStoreProxy.java:103)
at com.sun.proxy.$Proxy7.getTableColumnStatistics(Unknown Source)
at 
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table_statistics_req(HiveMetaStore.java:4644)
at sun.reflect.GeneratedMethodAccessor73.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.hive.metastore.RetryingHMSHandler.invokeInternal(RetryingHMSHandler.java:140)
at 
org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:99)
at com.sun.proxy.$Proxy9.get_table_statistics_req(Unknown Source)
at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Processor$get_table_statistics_req.getResult(ThriftHiveMetastore.java:11028)
at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Processor$get_table_statistics_req.getResult(ThriftHiveMetastore.java:11012)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at 
org.apache.hadoop.hive.metastore.TUGIBasedProcessor$1.run(TUGIBasedProcessor.java:110)
at 
org.apache.hadoop.hive.metastore.TUGIBasedProcessor$1.run(TUGIBasedProcessor.java:106)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
at 
org.apache.hadoop.hive.metastore.TUGIBasedProcessor.process(TUGIBasedProcessor.java:118)
at 
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
NestedThrowablesStackTrace:
java.sql.SQLException: Illegal mix of collations (latin1_bin,IMPLICIT), 
(utf8mb4_general_ci,COERCIBLE), (utf8mb4_general_ci,COERCIBLE) for operation ' 
IN '
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3933)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3869)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2524)
at 

flink 1.13.1 ????hive????????????

2021-09-04 Thread Asahi Lee
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: SQL validation failed. Failed to get table column stats 
of table cosldatacenter.ods_zyxx_coslzj_towing
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
... 10 more
Caused by: org.apache.flink.table.api.ValidationException: SQL validation 
failed. Failed to get table column stats of table 
cosldatacenter.ods_zyxx_coslzj_towing
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) 
~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.delegation.hive.HiveParser.processCmd(HiveParser.java:284)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:217)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:51)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
com.meritdata.cloud.tempo.dw.flow.engine.app.ScriptEngine.scriptRunning(ScriptEngine.java:201)
 ~[tempo-dw-flow-engine-app-0.0.1-SNAPSHOT.jar:?]
at 
com.meritdata.cloud.tempo.dw.flow.engine.app.ScriptEngine.execute(ScriptEngine.java:171)
 ~[tempo-dw-flow-engine-app-0.0.1-SNAPSHOT.jar:?]
at 
com.meritdata.cloud.tempo.dw.flow.engine.app.ScriptEngine.main(ScriptEngine.java:128)
 ~[tempo-dw-flow-engine-app-0.0.1-SNAPSHOT.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_141]
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_141]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_141]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_141]
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
... 10 more
Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed 
to get table column stats of table cosldatacenter.ods_zyxx_coslzj_towing
at 
org.apache.flink.table.catalog.hive.HiveCatalog.getTableColumnStatistics(HiveCatalog.java:1635)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.extractTableStats(DatabaseCalciteSchema.java:121)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getStatistic(DatabaseCalciteSchema.java:107)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:82)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at java.util.Optional.map(Optional.java:215) ~[?:1.8.0_141]
at 
org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:77)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289) 
~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 

flink 1.13.1 ????hive orc??????????????????????????

2021-09-04 Thread Asahi Lee
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1024
at 
org.apache.orc.impl.TreeReaderFactory$TreeReader.nextVector(TreeReaderFactory.java:269)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.orc.impl.TreeReaderFactory$TimestampTreeReader.nextVector(TreeReaderFactory.java:1007)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.orc.impl.ConvertTreeReaderFactory$DateFromTimestampTreeReader.nextVector(ConvertTreeReaderFactory.java:2115)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.orc.impl.TreeReaderFactory$StructTreeReader.nextBatch(TreeReaderFactory.java:2012)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.orc.impl.RecordReaderImpl.nextBatch(RecordReaderImpl.java:1300) 
~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.orc.nohive.shim.OrcNoHiveShim.nextBatch(OrcNoHiveShim.java:94) 
~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.orc.nohive.shim.OrcNoHiveShim.nextBatch(OrcNoHiveShim.java:41) 
~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.orc.AbstractOrcFileInputFormat$OrcVectorizedReader.readBatch(AbstractOrcFileInputFormat.java:260)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_141]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_141]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_141]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_141]

flink hive ??????????instr????????????????????????

2021-09-01 Thread Asahi Lee
hi!  ??flink 1.13.1??hivesql, ?? 
col1??string: ab'cd ,??instr ' 
??
   instr(col1, '\'')
   instr(col1, )
   instr(col1, '\''')
   instr(col1, '\\''')


Could not execute ALTER TABLE check_rule_base_hive_catalog.test_flink.test_partition DROP PARTITION (dt=2021-08-31)

2021-08-31 Thread Asahi Lee
hi!
??flink 1.13.1??hivedtsql hive??
Caused by: 
org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException: 
PartitionSpec CatalogPartitionSpec{{dt=2021-08-31}} does not match partition 
keys [dt, xtlx, sblx] of table test_flink.test_partition in catalog 
check_rule_base_hive_catalog.
at 
org.apache.flink.table.catalog.hive.HiveCatalog.getOrderedFullPartitionValues(HiveCatalog.java:1189)
 ~[flink-sql-connector-hive-2.3.6_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.catalog.hive.HiveCatalog.dropPartition(HiveCatalog.java:899)
 ~[flink-sql-connector-hive-2.3.6_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:982)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]

flink ??????????????truncate table????

2021-08-17 Thread Asahi Lee
hi!
  flink??truncate table??flink 
hivetruncate table??

flink 1.13.1??????????hive??????????insert overwirite??????????????????????????????????????????

2021-08-17 Thread Asahi Lee
hi??
 
??sqlselect0??hive
INSERT OVERWRITE target_table SELECT * from source_table where id  10;

flink 1.13.1 ????hive????sql, ????Can not make a progress: all selected inputs are already finished

2021-08-05 Thread Asahi Lee
java.util.concurrent.CompletionException: 
org.apache.flink.client.deployment.application.ApplicationExecutionException: 
Could not execute application.
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
 ~[?:1.8.0_141]
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
 ~[?:1.8.0_141]
at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) 
~[?:1.8.0_141]
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
 ~[?:1.8.0_141]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_141]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 ~[?:1.8.0_141]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:257)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_141]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_141]
at 
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159)
 [flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
 [flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
Caused by: 
org.apache.flink.client.deployment.application.ApplicationExecutionException: 
Could not execute application.
... 11 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: 
org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: 
Application Status: FAILED
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
... 10 more
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: 
Application Status: FAILED
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
~[?:1.8.0_141]
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) 
~[?:1.8.0_141]
at 
com.meritdata.cloud.tempo.dw.flow.engine.app.ScriptEngine.scriptRunning(ScriptEngine.java:200)
 ~[tempo-dw-flow-engine-app-0.0.1-SNAPSHOT.jar:?]
at 
com.meritdata.cloud.tempo.dw.flow.engine.app.ScriptEngine.execute(ScriptEngine.java:169)
 ~[tempo-dw-flow-engine-app-0.0.1-SNAPSHOT.jar:?]
at 
com.meritdata.cloud.tempo.dw.flow.engine.app.ScriptEngine.main(ScriptEngine.java:127)
 ~[tempo-dw-flow-engine-app-0.0.1-SNAPSHOT.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_141]
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_141]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_141]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_141]
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 

?????? flink 1.13.1 ????hive??????????hive sql????????

2021-08-04 Thread Asahi Lee
SqlParser.parseStmtlist()sqlSqlNode 
toString()??unicode




----
??: 
   "user-zh"



?????? flink 1.13.1 ????hive??????????hive sql????????

2021-08-02 Thread Asahi Lee
hive 1.1.0




----
??: 
   "user-zh"



flink 1.13.1 ????hive??????like????????????

2021-07-30 Thread Asahi Lee
hihive??sql??like


??
org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: 
like(VARCHAR(255), STRING NOT NULL)


org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: 
like(STRING, STRING NOT NULL)
If you think this function should be supported, you can create an issue and 
start a discussion for it.
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:845)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:845)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at scala.Option.getOrElse(Option.scala:121) 
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:844)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:849)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at scala.Option.getOrElse(Option.scala:121) 
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:837)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:529)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) 
~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:155)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:173)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:50)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator.generateCalcOperator(CalcCodeGenerator.scala)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.java:58)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:80)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:79)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at scala.collection.Iterator$class.foreach(Iterator.scala:891) 
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:79)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 

?????? flink 1.13.1 ????hive??????????hive sql????????

2021-07-30 Thread Asahi Lee
CREATE TABLE `cosldatacenter.ods_emp_maindata_iadc_paramvalue`(
 `paramvalue_id` string COMMENT '',
 `platform_id` string COMMENT '',
 `equipment_id` string COMMENT '',
 `param_id` string COMMENT '',
 `param_value` string COMMENT '',
 `remark` string COMMENT '',
 `create_time` string COMMENT '',
 `creator` string COMMENT '',
 `update_time` string COMMENT '',
 `update_person` string COMMENT '',
 `record_flag` double COMMENT '',
 `subject_id` string COMMENT '',
 `output_unit` string COMMENT '',
 `show_seq` double COMMENT '')
COMMENT ''
ROW FORMAT SERDE
 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
WITH SERDEPROPERTIES (
 'field.delim'=',',
 'serialization.format'=',')
STORED AS INPUTFORMAT
 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
 
'hdfs://hadoop02:8020/user/hive/warehouse/cosldatacenter.db/ods_emp_maindata_iadc_paramvalue'
TBLPROPERTIES (
 'COLUMN_STATS_ACCURATE'='false',
 'last_modified_by'='root',
 'last_modified_time'='1621834335',
 'numFiles'='0',
 'numRows'='-1',
 'rawDataSize'='-1',
 'totalSize'='0',
 'transient_lastDdlTime'='1621834335')



CREATE TABLE `cosldatacenter.ods_emp_md_large_equip`(
 `large_equip_id` string COMMENT '',
 `equip_name` string COMMENT '',
 `equip_type` string COMMENT '',
 `equip_function` string COMMENT '',
 `equip_board` string COMMENT '',
 `ship_yard` string COMMENT '',
 `manufacturer_date` string COMMENT '',
 `enqueue_date` string COMMENT '',
 `dockrepair_date` string COMMENT '',
 `scrap_date` string COMMENT '',
 `enqueue_mode` string COMMENT '',
 `work_for_org` string COMMENT '',
 `work_in_org` string COMMENT '',
 `old_age` string COMMENT '',
 `create_time` date COMMENT '',
 `creator` string COMMENT '',
 `update_time` date COMMENT '',
 `update_person` string COMMENT '',
 `record_flag` double COMMENT '',
 `data_timestamp` string COMMENT '',
 `work_unit_id` string COMMENT '',
 `work_status` string COMMENT '',
 `work_location` string COMMENT '',
 `work_area` string COMMENT '',
 `equip_code` string COMMENT '',
 `shi_main_power` double COMMENT '',
 `shi_total_len` double COMMENT '',
 `shi_type_width` double COMMENT '',
 `shi_type_depth` double COMMENT '',
 `shi_design_draft` double COMMENT '',
 `shi_total_tonnage` double COMMENT '',
 `shi_load_tonnage` double COMMENT '',
 `remark` string COMMENT '',
 `unit_classification1` string COMMENT '',
 `unit_classification2` string COMMENT '')
COMMENT ''
ROW FORMAT SERDE
 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
WITH SERDEPROPERTIES (
 'field.delim'=',',
 'serialization.format'=',')
STORED AS INPUTFORMAT
 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
 
'hdfs://hadoop02:8020/user/hive/warehouse/cosldatacenter.db/ods_emp_md_large_equip'
TBLPROPERTIES (
 'COLUMN_STATS_ACCURATE'='false',
 'last_modified_by'='root',
 'last_modified_time'='1621834338',
 'numFiles'='0',
 'numRows'='-1',
 'rawDataSize'='-1',
 'totalSize'='0',
 'transient_lastDdlTime'='1621834338')



CREATE TABLE `cosldatacenter.ods_emp_maindata_iadc_paramdef`(
 `param_id` string COMMENT '',
 `iadc_id` string COMMENT '',
 `param_code` string COMMENT '',
 `param_en` string COMMENT '',
 `param_cn` string COMMENT '',
 `output_standard` string COMMENT '',
 `output_unit` string COMMENT '',
 `param_type` string COMMENT '',
 `param_value` string COMMENT '',
 `remark` string COMMENT '',
 `create_time` string COMMENT '',
 `creator` string COMMENT '',
 `update_time` string COMMENT '',
 `update_person` string COMMENT '',
 `record_flag` double COMMENT '')
COMMENT ''
ROW FORMAT SERDE
 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
WITH SERDEPROPERTIES (
 'field.delim'=',',
 'serialization.format'=',')
STORED AS INPUTFORMAT
 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
 
'hdfs://hadoop02:8020/user/hive/warehouse/cosldatacenter.db/ods_emp_maindata_iadc_paramdef'
TBLPROPERTIES (
 'COLUMN_STATS_ACCURATE'='false',
 'last_modified_by'='root',
 'last_modified_time'='1621834335',
 'numFiles'='0',
 'numRows'='-1',
 'rawDataSize'='-1',
 'totalSize'='0',
 'transient_lastDdlTime'='1621834335')



CREATE TABLE `cosldatacenter.dw_riginfoparam`(
 `large_equip_id` string,
 `equip_code` string,
 `equip_name` string,
 `enqueue_date` string,
 `shi_total_len` double,
 `shi_type_width` double,
 `shi_type_depth` double,
 `moonpool` string,
 `maxwindvelocity` string,
 `maxwaveheight` string,
 `airgap` string,
 `maxopewaterdepth` string,
 `drilldepthcap` string,
 `drillvl` string,
 `drillwater` string,
 `potablewater` string)
ROW FORMAT SERDE
 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
WITH SERDEPROPERTIES (
 'field.delim'=',',
 'serialization.format'=',')
STORED AS INPUTFORMAT
 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
 
'hdfs://hadoop02:8020/user/hive/warehouse/cosldatacenter.db/dw_riginfoparam'

?????? flink 1.13.1, metrics?????????????? Unnamed

2021-07-30 Thread Asahi Lee
sql??sql




----
??: 
   "user-zh"



flink 1.13.1, metrics?????????????? Unnamed

2021-07-29 Thread Asahi Lee
Hi??
??sql??Unnamed??bug??
??
node103.taskmanager.container_1627469681067_0030_01_02.e621b91ec4a34ababeb6b0e2c4d6f22b.Source:
 HiveSource-qc_test_t_student_score - Calc(select=[id, 
CAST(_UTF-16LE'Bob':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS name, 
class_id, class_name, score, 
_UTF-16LE'9bdb0e98cc5b4800ae3b56575c442225':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" AS rule_id, _UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" AS task_batch_id], where=[(name = _UTF-16LE'Bob':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE")]) - Map - Sink: 
Unnamed.0.Shuffle.Netty.Input.Buffers.inputFloatingBuffersUsage


node103.taskmanager.container_1627469681067_0030_01_02.e621b91ec4a34ababeb6b0e2c4d6f22b.Source:
 HiveSource-qc_test_t_student_score - Calc(select=[id, 
CAST(_UTF-16LE'Bob':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS name, 
class_id, class_name, score, 
_UTF-16LE'9bdb0e98cc5b4800ae3b56575c442225':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" AS rule_id, _UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" AS task_batch_id], where=[(name = _UTF-16LE'Bob':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE")]) - Map - Sink: 
Unnamed.0.Shuffle.Netty.Input.Buffers.inPoolUsage



node103.taskmanager.container_1627469681067_0030_01_02.e621b91ec4a34ababeb6b0e2c4d6f22b.Sink:
 Unnamed.0.numRecordsIn


sql??
CREATE CATALOG `qc_hive_catalog` WITH ( 'type' = 'hive', 'default-database' = 
'qc_test' );
USE CATALOG `qc_hive_catalog`;
CREATE TABLE
IF
NOT EXISTS QC_RESULT_T_STUDENT_SCORE ( id STRING, NAME STRING, class_id 
STRING, class_name STRING, score INTEGER, rule_id STRING, task_batch_id STRING 
) WITH ( 'is_generic' = 'false', 'connector' = 'hive' );
INSERT INTO QC_RESULT_T_STUDENT_SCORE SELECT
id,
NAME,
class_id,
class_name,
score,
cast( '9bdb0e98cc5b4800ae3b56575c442225' AS STRING ) AS rule_id,
cast( '3' AS STRING ) AS task_batch_id
FROM
t_student_score
WHERE
t_student_score.NAME = 'Bob';

?????? flink 1.13.1 ????hive??????????hive sql????????

2021-07-29 Thread Asahi Lee
hi!
??else??sqlInvalid table alias or 
column reference 'u' ??sql'u'
CREATE CATALOG `tempo_df_hive_default_catalog` WITH(
  'type' = 'hive',
  'default-database' = 'default'
);
USE CATALOG tempo_df_hive_default_catalog;
CREATE TABLE IF NOT EXISTS `default`.`tempo_blackhole_table` (
 f0 INT
);
insert into cosldatacenter.dw_riginfoparam
select
c.LARGE_EQUIP_ID,
c.EQUIP_CODE,
c.EQUIP_NAME,
c.ENQUEUE_DATE,
c.SHI_TOTAL_LEN,
c.SHI_TYPE_WIDTH,
c.SHI_TYPE_DEPTH,
case when b.param_cn = '' then a.param_value else null end as Moonpool,
case when b.param_cn = '' then a.param_value else null end as 
MaxWindvelocity,
case when b.param_cn = '' then a.param_value else null end as 
MaxWaveheight,
case when b.param_cn = '' then a.param_value else null end as Airgap,
case when b.param_cn = '' then a.param_value else null end as 
MaxOpeWaterdepth,
case when b.param_cn = '' then a.param_value else null end as 
DrilldepthCap,
case when b.param_cn = '' then a.param_value else null end as 
DrillVL,
case when b.param_cn = '??' then a.param_value else null end as DrillWater,
case when b.param_cn = '??' then a.param_value else null end as PotableWater
from cosldatacenter.ods_emp_maindata_iadc_paramvalue a
inner join cosldatacenter.ods_emp_maindata_iadc_paramdef b on a.param_id = 
b.param_id
inner join cosldatacenter.ods_emp_md_large_equip c on 
a.SUBJECT_ID=c.LARGE_EQUIP_ID;
INSERT INTO `default`.`tempo_blackhole_table` SELECT 1 ;





org.apache.hadoop.hive.ql.parse.SemanticException: Line 2:178 Invalid table 
alias or column reference 'u': (possible column names are: a.paramvalue_id, 
a.platform_id, a.equipment_id, a.param_id, a.param_value, a.remark, 
a.create_time, a.creator, a.update_time, a.update_person, a.record_flag, 
a.subject_id, a.output_unit, a.show_seq, b.param_id, b.iadc_id, b.param_code, 
b.param_en, b.param_cn, b.output_standard, b.output_unit, b.param_type, 
b.param_value, b.remark, b.create_time, b.creator, b.update_time, 
b.update_person, b.record_flag, c.large_equip_id, c.equip_name, c.equip_type, 
c.equip_function, c.equip_board, c.ship_yard, c.manufacturer_date, 
c.enqueue_date, c.dockrepair_date, c.scrap_date, c.enqueue_mode, 
c.work_for_org, c.work_in_org, c.old_age, c.create_time, c.creator, 
c.update_time, c.update_person, c.record_flag, c.data_timestamp, 
c.work_unit_id, c.work_status, c.work_location, c.work_area, c.equip_code, 
c.shi_main_power, c.shi_total_len, c.shi_type_width, c.shi_type_depth, 
c.shi_design_draft, c.shi_total_tonnage, c.shi_load_tonnage, c.remark, 
c.unit_classification1, c.unit_classification2)




----
??: 
   "user-zh"



flink 1.13.1 ????hive??????????hive sql????????

2021-07-27 Thread Asahi Lee
Hi??
??flink 1.13.1??hive sql??
CREATE CATALOG `tempo_df_hive_default_catalog` WITH(
  'type' = 'hive',
  'default-database' = 'default'
);
USE CATALOG tempo_df_hive_default_catalog;
CREATE TABLE IF NOT EXISTS `default`.`tempo_blackhole_table` (
 f0 INT
);
use cosldatacenter;
INSERT INTO `dw_riginfoparam` ( `large_equip_id`, `equip_code`, `equip_name`, 
`enqueue_date`, `shi_total_len`, `shi_type_width`, `shi_type_depth`, `moonpool` 
) SELECT
mle.`large_equip_id` ,
mle.`equip_code` ,
mle.`equip_name` ,
mle.`enqueue_date` ,
mle.`shi_total_len` ,
mle.`shi_type_width` ,
mle.`shi_type_depth`,
CASE
WHEN mipd.`param_cn` = '' THEN
mipv.`param_value`
END AS `Moonpool`
from `ods_emp_maindata_iadc_paramvalue` mipv
INNER JOIN `ods_emp_maindata_iadc_paramdef` mipd ON mipv.`param_id` = 
mipd.`param_id`
 inner JOIN `ods_emp_md_large_equip` mle ON mipv.`SUBJECT_ID`= 
mle.`LARGE_EQUIP_ID`;
INSERT INTO `default`.`tempo_blackhole_table` SELECT 1 ;



??
Caused by: org.apache.hadoop.hive.ql.parse.SemanticException: Line 2:195 
Invalid table alias or column reference 'u': (possible column names are: 
mipv.paramvalue_id, mipv.platform_id, mipv.equipment_id, mipv.param_id, 
mipv.param_value, mipv.remark, mipv.create_time, mipv.creator, 
mipv.update_time, mipv.update_person, mipv.record_flag, mipv.subject_id, 
mipv.output_unit, mipv.show_seq, mipd.param_id, mipd.iadc_id, mipd.param_code, 
mipd.param_en, mipd.param_cn, mipd.output_standard, mipd.output_unit, 
mipd.param_type, mipd.param_value, mipd.remark, mipd.create_time, mipd.creator, 
mipd.update_time, mipd.update_person, mipd.record_flag, mle.large_equip_id, 
mle.equip_name, mle.equip_type, mle.equip_function, mle.equip_board, 
mle.ship_yard, mle.manufacturer_date, mle.enqueue_date, mle.dockrepair_date, 
mle.scrap_date, mle.enqueue_mode, mle.work_for_org, mle.work_in_org, 
mle.old_age, mle.create_time, mle.creator, mle.update_time, mle.update_person, 
mle.record_flag, mle.data_timestamp, mle.work_unit_id, mle.work_status, 
mle.work_location, mle.work_area, mle.equip_code, mle.shi_main_power, 
mle.shi_total_len, mle.shi_type_width, mle.shi_type_depth, 
mle.shi_design_draft, mle.shi_total_tonnage, mle.shi_load_tonnage, mle.remark, 
mle.unit_classification1, mle.unit_classification2)
at 
org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.genAllExprNodeDesc(HiveParserSemanticAnalyzer.java:2467)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.genExprNodeDesc(HiveParserSemanticAnalyzer.java:2421)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner.genSelectLogicalPlan(HiveParserCalcitePlanner.java:2314)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner.genLogicalPlan(HiveParserCalcitePlanner.java:2772)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner.logicalPlan(HiveParserCalcitePlanner.java:285)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner.genLogicalPlan(HiveParserCalcitePlanner.java:273)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.delegation.hive.HiveParser.analyzeSql(HiveParser.java:326)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.delegation.hive.HiveParser.processCmd(HiveParser.java:274)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:217)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:51)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]

?????? flink 1.13.1 sql hive is_generic = false ??????????????????????

2021-07-26 Thread Asahi Lee
??flink 1.12.0??




----
??: 
   "user-zh"



flink 1.13.1 sql hive is_generic = false ??????????????????????

2021-07-26 Thread Asahi Lee
hi!
??flink 
1.13.1??sqlhive
??hive 2.3.6??flink-sql-connector-hive-2.3.6


package com.meritdata.cloud.flink.test;


import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;


public class Test {


  public static void main(String[] args) {


EnvironmentSettings bbSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnvironment = 
TableEnvironment.create(bbSettings);


tableEnvironment.executeSql("create catalog myhive 
with (" +
" 'type' = 'hive', " +
" 'default-database' = 
'default', " +
")");


tableEnvironment.executeSql("use catalog myhive");
tableEnvironment.executeSql("create table if not 
exists q1 " +
"( id string ) " +
"with ('is_generic' = 
'false')");


/**
* hive, 
* desc formatted q1;
*
* col_name 
  data_type  comment
*
*
* Table Parameters:
* flink.is_generic  false
* flink.schema.0.data-type 
VARCHAR(2147483647)
* flink.schema.0.name id
* transient_lastDdTime 1627279802
*
*/


  }




}

flink 1.13.1 ????????row(a, b)??????????????

2021-07-22 Thread Asahi Lee
hi??
1. flink 1.13.1 
??row(a,b)bug??
2.  
rowrowrowname??name
??


package test;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.ResolvedSchema;


public class DataGenTest {


  public static void main(String[] args) {
StreamExecutionEnvironment 
streamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(streamExecutionEnvironment);


tableEnvironment.executeSql("CREATE TABLE datagen 
(\n" +
" f_sequence INT,\n" +
" f_random INT,\n" +
" f_random_str 
STRING,\n" +
" ts AS 
localtimestamp,\n" +
" WATERMARK FOR ts AS 
ts\n" +
") WITH (\n" +
" 'connector' = 
'datagen',\n" +
" 
'rows-per-second'='5',\n" +
" 
'fields.f_sequence.kind'='sequence',\n" +
" 
'fields.f_sequence.start'='1',\n" +
" 
'fields.f_sequence.end'='1000',\n" +
" 
'fields.f_random.min'='1',\n" +
" 
'fields.f_random.max'='1000',\n" +
" 
'fields.f_random_str.length'='10'\n" +
")");


Table table = tableEnvironment.sqlQuery("select 
row(f_sequence, f_random) as c from datagen");
ResolvedSchema resolvedSchema = 
table.getResolvedSchema();
System.out.println(resolvedSchema);
/**
* ??
* (
* `c` ROW<`EXPR$0` INT, `EXPR$1` 
INT NOT NULL
* )
* 
rowrowrow??c1,
 c2??
* (
* `c` ROW<`c1` INT, `c2` INT 
NOT NULL
* )
*/


Table table1 = tableEnvironment.sqlQuery("select * 
from " + table);
/**
* sql??
* Exception in thread "main" 
java.lang.AssertionError: Conversion to relational algebra failed to preserve 
datatypes:
* validated type:
* 
RecordType(RecordType:peek_no_expand(INTEGER EXPR$0, INTEGER EXPR$1) NOT NULL 
c) NOT NULL
* converted type:
* RecordType(RecordType(INTEGER EXPR$0, 
INTEGER EXPR$1) NOT NULL c) NOT NULL
* rel:
* LogicalProject(c=[ROW($0, $1)])
* 
LogicalWatermarkAssigner(rowtime=[ts], watermark=[$3])
*  
LogicalProject(f_sequence=[$0], f_random=[$1], f_random_str=[$2], 
ts=[LOCALTIMESTAMP])
*   
LogicalTableScan(table=[[default_catalog, default_database, datagen]])
*/
ResolvedSchema resolvedSchema1 = 
table1.getResolvedSchema();
System.out.println(resolvedSchema1);


table.execute().print();




  }


}

flink 1.13.1 org.apache.flink.table.catalog.Column ????????????????Serializable????

2021-06-14 Thread Asahi Lee
hi??
  org.apache.flink.table.catalog.Column 
Serializable??Serializable??

??????Flink SQL ????????????DynamoDB

2021-06-14 Thread Asahi Lee
https://flink-packages.org/packages/streaming-flink-dynamodb-connector




----
??: 
   "user-zh"



flink 1.13.0 ??????flink sql ??????????????????????????????????schema.name

2021-05-19 Thread Asahi Lee
hi!
   flink jdbc ?? 
table-name??
CREATE TABLE MyUserTable (   id BIGINT,   name STRING,   age INT,   status 
BOOLEAN,   PRIMARY KEY (id) NOT ENFORCED ) WITH ('connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase','table-name' = 'org.users' 
);

Flink CEP????????????????flink??????????????cep????

2021-04-15 Thread Asahi Lee
hi??
   flink 
cep??cep

??????????????????flink ????????????????????????kafka,mysql??

2021-03-14 Thread Asahi Lee
??
   ??flink 
??

flink 1.12.2??????????????????????????????????????????????????

2021-03-09 Thread Asahi Lee
????flink 
1.12??flinkDataStream 
API,??RuntimeExecutionMode.BATCH??
 

package com.meritdata.cloud.tempo.dw.flink.test.bug;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class JDBCTest {

public static void main(String[] args) {
test();
/**
 * ??
 * EnvironmentSettings bbSettings = EnvironmentSettings.newInstance()
 * .useBlinkPlanner().inBatchMode().build();
 * TableEnvironment bbTableEnv = 
TableEnvironment.create(bbSettings);
 * ++--+
 * |  a |   EXPR$1 |
 * ++--+
 * |  2 |1 |
 * |  3 |2 |
 * |  1 |2 |
 * |  4 |1 |
 * ++--+
 */
//test1();

/**
 * ??API
 * StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
 * streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
 * StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(streamEnv);
 * +++--+
 * | op |  a |   EXPR$1 |
 * +++--+
 * | +I |  2 |1 |
 * | +I |  1 |1 |
 * | +I |  4 |1 |
 * | -U |  1 |1 |
 * | +U |  1 |2 |
 * | +I |  3 |1 |
 * | -U |  3 |1 |
 * | +U |  3 |2 |
 * +++--+
 */
}

public static void test() {
EnvironmentSettings bbSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

bbTableEnv.executeSql("CREATE TABLE ab (" +
"  a STRING, " +
"  b INT " +
") WITH (" +
"   'connector' = 'jdbc'," +
"   'url' = 
'jdbc:mysql://localhost:3306/a?serverTimezone=UTC'," +
"   'username' = 'root'," +
"   'password' = 'root'," +
"   'table-name' = 'ab'" +
" )");

bbTableEnv.sqlQuery("select a, count(b) from ab group by 
a").execute().print();

}

public static void test1() {
StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(streamEnv);

tableEnv.executeSql("CREATE TABLE ab (" +
"  a STRING, " +
"  b INT " +
") WITH (" +
"   'connector' = 'jdbc'," +
"   'url' = 
'jdbc:mysql://localhost:3306/a?serverTimezone=UTC'," +
"   'username' = 'root'," +
"   'password' = 'root'," +
"   'table-name' = 'ab'" +
" )");

tableEnv.sqlQuery("select a, count(b) from ab group by 
a").execute().print();
}

}

flink 1.12.2??????DataStream????Table??????rowtime????????????????

2021-03-06 Thread Asahi Lee
??
   ??
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv);

DataStream

FlinkUserCodeClassLoader??session??????????????????????????

2021-01-21 Thread Asahi Lee
??
   
??flink-1.12.0,??flinkmain??URLClassLoader??http://a.jar??jarrest
 api jar/run??uber-jar, 
jobuber-jar
  

Table proram cannot be compiled

2021-01-18 Thread Asahi Lee
??   flink 
1.12.0??org.apach.flink.table.runtime.generated.CompileUtils.compile()DataSteamTable
 program cannot be compiled. This is a bug. Please file an 
issue.?? public abstract java.lang.Object 
org.apache.flink.api.java.functions.KeySelector.getKey(java.lang.Object) 
??

??????flink 1.11.2 rowtime??proctime?? Interval Join ????????????

2020-11-25 Thread Asahi Lee
??
   
??join




----
??: 
   "user-zh"

<18868816...@163.com;
:2020??11??25??(??) 7:31
??:"user-zh"

flink 1.11.2 rowtime??proctime?? Interval Join ????????????

2020-11-25 Thread Asahi Lee
?? Interval 
Join??flink 
1.11.2joinjoin??
  l_table.l_rt = r_table.r_pt l_table.l_rt BETWEEN 
r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' 
SECOND??package join;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Test1 {

public static void main(String[] args) {
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = 
StreamTableEnvironment.create(bsEnv, bsSettings);

String lTable = "CREATE TABLE l_table (  " +
" l_a INT,  " +
" l_b string,  " +
" l_rt AS localtimestamp,  " +
" WATERMARK FOR l_rt AS l_rt  " +
") WITH (  " +
" 'connector' = 'datagen',  " +
" 'rows-per-second'='5',  " +
" 'fields.l_a.min'='1',  " +
" 'fields.l_a.max'='5',  " +
" 'fields.l_b.length'='5'  " +
")";
bsTableEnv.executeSql(lTable);

String rTable = "CREATE TABLE r_table (  " +
" r_a INT,  " +
" r_b string,  " +
" r_pt AS proctime()  " +
") WITH (  " +
" 'connector' = 'datagen',  " +
" 'rows-per-second'='5',  " +
" 'fields.r_a.min'='1',  " +
" 'fields.r_a.max'='5',  " +
" 'fields.r_b.length'='5'  " +
")";
bsTableEnv.executeSql(rTable);

String printTable = "CREATE TABLE print (" +
"  l_a INT,  " +
"  l_b string,  " +
"  l_rt timestamp(3),  " +
"  r_a INT,  " +
"  r_b string,  " +
"  r_pt timestamp(3)  " +
") WITH (  " +
" 'connector' = 'print' " +
") ";

bsTableEnv.executeSql(printTable);

// 
//Table joinTable = bsTableEnv.sqlQuery("select * from l_table join 
r_table on l_table.l_a = r_table.r_a and l_table.l_rt = r_table.r_pt");

// ??Rowtime attributes must not be in the input rows of a 
regular join. As a workaround you can cast the time attributes of input tables 
to TIMESTAMP before.
Table joinTable = bsTableEnv.sqlQuery("select * from l_table join 
r_table on l_table.l_a = r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - 
INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND");

bsTableEnv.executeSql("insert into print select * from " + joinTable);

}

}

??????flink 1.11.2 ????????????

2020-11-14 Thread Asahi Lee
??format=json??flinkjacksonjackson??




----
??: 
   "Asahi Lee"  
  
<978466...@qq.com;
:2020??11??14??(??) 4:58
??:"user-zh"

flink 1.11.2 ????????????

2020-11-14 Thread Asahi Lee
??   ??flink sql 
1.11.2??proctime()UTC??+08env.java.optsjvm??+08
json

?????? flink 1.11.2 ????????blink????????BatchTableEnvironment????

2020-11-13 Thread Asahi Lee
BatchTableEnvironmenttable to dataset; dataset to table




----
??: 
   "user-zh"



?????? flink 1.11.2 ????????blink????????BatchTableEnvironment????

2020-11-09 Thread Asahi Lee
??BatchTableEnvironment 




----
??: 
   "user-zh"



flink 1.11.2 ????????blink????????BatchTableEnvironment????

2020-11-08 Thread Asahi Lee
??
   ??flink 
1.11.2??blink??batch
// ** // BLINK BATCH QUERY // ** import 
org.apache.flink.table.api.EnvironmentSettings; import 
org.apache.flink.table.api.TableEnvironment; EnvironmentSettings bbSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); 
TableEnvironment bbTableEnv = 
TableEnvironment.create(bbSettings);??blink??batch??BatchTableEnvironment??

??????1.11.1 ??OutOfMemoryError: Metaspace. ????

2020-11-05 Thread Asahi Lee
??
   ??flink 
1.11.2MySQL
/*
  Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved.

  The MySQL Connector/J is licensed under the terms of the GPLv2
  <http://www.gnu.org/licenses/old-licenses/gpl-2.0.html;, like most MySQL 
Connectors.
  There are special exceptions to the terms and conditions of the GPLv2 as it 
is applied to
  this software, see the FOSS License Exception
  <http://www.mysql.com/about/legal/licensing/foss-exception.html;.

  This program is free software; you can redistribute it and/or modify it under 
the terms
  of the GNU General Public License as published by the Free Software 
Foundation; version 2
  of the License.

  This program is distributed in the hope that it will be useful, but WITHOUT 
ANY WARRANTY;
  without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 
PARTICULAR PURPOSE.
  See the GNU General Public License for more details.

  You should have received a copy of the GNU General Public License along with 
this
  program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, 
Fifth
  Floor, Boston, MA 02110-1301  USA

 */

package com.mysql.cj.jdbc;

import java.lang.ref.Reference;

import com.mysql.cj.jdbc.NonRegisteringDriver.ConnectionPhantomReference;

public class AbandonedConnectionCleanupThread extends Thread {
private static boolean running = true;
private static Thread threadRef = null;

public AbandonedConnectionCleanupThread() {
super("Abandoned connection cleanup thread");
}

@Override
public void run() {
threadRef = this;
while (running) {
try {
Referencehttps://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#taskmanager-memory-jvm-metaspace-size
[2] https://issues.apache.org/jira/browse/FLINK-16681


Best,
Hailong Wang
?? 2020-11-04 19:08:37??"Asahi Lee" <978466...@qq.com ??
??
nbsp; nbsp; nbsp; ??flink 
sql,JDBC??mysql??OutOfMemoryError: 
Metaspace.mysqlconnection??class??

1.11.1 ??OutOfMemoryError: Metaspace. ????

2020-11-04 Thread Asahi Lee
??
   ??flink 
sql,JDBC??mysql??OutOfMemoryError: 
Metaspace.mysqlconnection??class??

flink 1.11.1 web ui ????????source??????detail??????recoreds sent????????????????0

2020-11-04 Thread Asahi Lee
??
  flink??web 
uisource??detailRecords 
Sent??0??

?????? flink 1.11.2 Table sql??????????????????2????????????????????

2020-09-27 Thread Asahi Lee
?? flink 
SQL??rebalance----
??:zilongnbsp;xiao

flink 1.11.2 Table sql??????????????????2????????????????????

2020-09-27 Thread Asahi Lee
??
  ??flink 
sqlkafkasql??kafka1??2kafka
  stream apisql 
api??sql

?????? 1.11????????????????????????????????????????????????????????????????

2020-09-07 Thread Asahi Lee
??
   
StreamTableEnvironment.from("")??datagen??table??


??
package org.apache.flink.playgrounds.spendreport;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


import java.util.Arrays;


public class Test2 {


  public static void main(String[] args) {
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = 
StreamTableEnvironment.create(bsEnv, bsSettings);


String sourceDDL = "CREATE TABLE datagen ( " +
" f_random INT," +
" f_random_str STRING " 
+
" ) WITH ( " +
" 'connector' = 
'datagen', " +
" 
'rows-per-second'='10'," +
" 
'fields.f_random.min'='1', " +
" 
'fields.f_random.max'='10', " +
" 
'fields.f_random_str.length'='10' " +
" ) ";
bsTableEnv.executeSql(sourceDDL);
for (int i = 0; i < 10; i++) {
  Table datagen = 
bsTableEnv.from("datagen");
  System.out.println(datagen);
}
System.out.println("-" + 
Arrays.toString(bsTableEnv.listTables()));


  }


}



UnnamedTable$0
UnnamedTable$1
UnnamedTable$2
UnnamedTable$3
UnnamedTable$4
UnnamedTable$5
UnnamedTable$6
UnnamedTable$7
UnnamedTable$8
UnnamedTable$9
-[UnnamedTable$0, UnnamedTable$1, UnnamedTable$2, 
UnnamedTable$3, UnnamedTable$4, UnnamedTable$5, UnnamedTable$6, UnnamedTable$7, 
UnnamedTable$8, UnnamedTable$9, datagen]

1.11????????????????????????????????????????????????????????????????

2020-09-04 Thread Asahi Lee
??
??StreamTableEnvironment.from("")
??package kafka;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Test3 {

public static void main(String[] args) {
// 
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = 
StreamTableEnvironment.create(bsEnv, bsSettings);

// ??
String inTablePath = "CREATE TABLE datagen (  " +
" id INT,  " +
" total string,  " +
" ts AS localtimestamp,  " +
" WATERMARK FOR ts AS ts  " +
") WITH (  " +
" 'connector' = 'datagen',  " +
" 'rows-per-second'='5',  " +
" 'fields.id.min'='1',  " +
" 'fields.id.max'='10',  " +
" 'fields.total.length'='10'  " +
")";
// ??
bsTableEnv.executeSql(inTablePath);

Table table = bsTableEnv.sqlQuery("select id, total, 12 as col_1 from 
datagen");
bsTableEnv.createTemporaryView("table1", table);

Table table1 = bsTableEnv.from("table1");
System.out.println(table1);
// ??table1

Table queryT = bsTableEnv.sqlQuery("select table1.id, 1 as b from 
table1");
System.out.println(queryT.getSchema());


bsTableEnv.sqlQuery("select table1.id from " + 
bsTableEnv.from("table1"));

}

}

?????? 1.11??????????TableEnvironment.executeSql("insert into ...")??job??????????????

2020-08-23 Thread Asahi Lee
??sqljob??




----
??: 
   "user-zh"



1.11??????????TableEnvironment.executeSql("insert into ...")??job??????????????

2020-08-20 Thread Asahi Lee
??
  insert 
into??job


??
EnvironmentSettings bbSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().build();
TableEnvironment bsTableEnv = TableEnvironment.create(bbSettings);

String sourceDDL = "CREATE TABLE datagen (  " +
" f_random INT,  " +
" f_random_str STRING,  " +
" ts AS localtimestamp,  " +
" WATERMARK FOR ts AS ts  " +
") WITH (  " +
" 'connector' = 'datagen',  " +
" 'rows-per-second'='10',  " +
" 'fields.f_random.min'='1',  " +
" 'fields.f_random.max'='5',  " +
" 'fields.f_random_str.length'='10'  " +
")";

bsTableEnv.executeSql(sourceDDL);
Table datagen = bsTableEnv.from("datagen");

System.out.println(datagen.getSchema());

String sinkDDL = "CREATE TABLE print_table (" +
" f_random int," +
" c_val bigint, " +
" wStart TIMESTAMP(3) " +
") WITH ('connector' = 'print') ";
bsTableEnv.executeSql(sinkDDL);

System.out.println(bsTableEnv.from("print_table").getSchema());

Table table = bsTableEnv.sqlQuery("select f_random, count(f_random_str), 
TUMBLE_START(ts, INTERVAL '5' second) as wStart from datagen group by 
TUMBLE(ts, INTERVAL '5' second), f_random");
bsTableEnv.executeSql("insert into print_table select * from " + table);

1.11????????????????????Cannot have more than one execute() or executeAsyc() call in a single environment

2020-08-18 Thread Asahi Lee
hello!
  
flink??iedaCannot have 
more than one execute() or executeAsyc() call in a single 
environmentjob 
manager??
  
org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution()??139

????datagen connector??????????????????????????????????????????????????????????

2020-07-29 Thread Asahi Lee
1. package kafka;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class DataGenTest {

public static void main(String[] args) {

StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = 
StreamTableEnvironment.create(bsEnv, bsSettings);

String sourceTableDDL = "CREATE TABLE datagen ( " +
" f_random INT, " +
" f_random_str STRING, " +
" ts AS localtimestamp, " +
" WATERMARK FOR ts AS ts " +
") WITH ( " +
" 'connector' = 'datagen', " +
" 'rows-per-second'='20', " +
" 'fields.f_random.min'='1', " +
" 'fields.f_random.max'='10', " +
" 'fields.f_random_str.length'='10' " +
")";

bsTableEnv.executeSql(sourceTableDDL);

bsTableEnv.executeSql("SELECT f_random, count(1) " +
"FROM datagen " +
"GROUP BY TUMBLE(ts, INTERVAL '1' second), f_random").print();

}

}2. log4j:WARN No appenders could be found for logger 
(org.apache.flink.table.module.ModuleManager). log4j:WARN Please initialize the 
log4j system properly. log4j:WARN See 
http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 
+-+--+ |f_random |   EXPR$1 | 
+-+--+

?????? flink 1.11 ddl sql ????PROCTIME()????????csv????

2020-07-23 Thread Asahi Lee
filesystemcsv??
filesystem




----
??: 
   "user-zh"

https://issues.apache.org/jira/browse/FLINK-18665 
<https://issues.apache.org/jira/browse/FLINK-18665;

 ?? 2020??7??23????00:07??Asahi Lee <978466...@qq.com ??
 
 1. 
 StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
 nbsp; nbsp; nbsp; nbsp; EnvironmentSettings bsSettings 
= EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
 nbsp; nbsp; nbsp; nbsp; StreamTableEnvironment 
bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
 
 
 nbsp; nbsp; nbsp; nbsp; String sourceTableDDL = 
"CREATE TABLE fs_table (" +
 nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; 
nbsp; nbsp; "nbsp; user_id STRING," +
 nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; 
nbsp; nbsp; "nbsp; order_amount DOUBLE," +
 nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; 
nbsp; nbsp; "nbsp; dt TIMESTAMP(3)," +
 nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; 
nbsp; nbsp; "nbsp; pt AS PROCTIME() " +
 nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; 
nbsp; nbsp; " ) WITH (" +
 nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; 
nbsp; nbsp; "nbsp; 'connector'='filesystem'," +
 nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; 
nbsp; nbsp; "nbsp; 'path'='D:\\Program 
Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv',"
 +
 nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; 
nbsp; nbsp; "nbsp; 'format'='csv'" +
 nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; 
nbsp; nbsp; " )";
 
 
 nbsp; nbsp; nbsp; nbsp; 
bsTableEnv.executeSql(sourceTableDDL);
 nbsp; nbsp; nbsp; nbsp; bsTableEnv.executeSql("select 
* from fs_table").print();
 2. csv
 order.csv
 zhangsan,12.34,2020-08-03 12:23:50
 lisi,234.67,2020-08-03 12:25:50
 wangwu,57.6,2020-08-03 12:25:50
 zhaoliu,345,2020-08-03 12:28:50
 
 
 
 3. 
 nbsp;- Source: FileSystemTableSource(user_id, order_amount, dt, pt) 
-gt; Calc(select=[user_id, order_amount, dt, PROCTIME_MATERIALIZE(()) AS 
pt]) -gt; SinkConversionToRow (4/6) (9ee0383d676a190b0a62d206039db26c) 
switched from RUNNING to FAILED.
 java.io.IOException: Failed to deserialize CSV row.
at 
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299)
at 
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210)
at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
 Caused by: java.lang.RuntimeException: Row length mismatch. 4 fields 
expected but was 3.
at 
org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441)
at 
org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244)
at 
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293)
... 5 more

flink 1.11 ddl sql ????PROCTIME()????????csv????

2020-07-22 Thread Asahi Lee
1. 
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = 
StreamTableEnvironment.create(bsEnv, bsSettings);


String sourceTableDDL = "CREATE TABLE fs_table (" +
" user_id 
STRING," +
" order_amount 
DOUBLE," +
" dt 
TIMESTAMP(3)," +
" pt AS 
PROCTIME() " +
" ) WITH (" +
" 
'connector'='filesystem'," +
" 
'path'='D:\\Program 
Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv',"
 +
" 'format'='csv'" 
+
" )";


bsTableEnv.executeSql(sourceTableDDL);
bsTableEnv.executeSql("select * from 
fs_table").print();
2. csv
order.csv
zhangsan,12.34,2020-08-03 12:23:50
lisi,234.67,2020-08-03 12:25:50
wangwu,57.6,2020-08-03 12:25:50
zhaoliu,345,2020-08-03 12:28:50



3. 
- Source: FileSystemTableSource(user_id, order_amount, dt, pt) - 
Calc(select=[user_id, order_amount, dt, PROCTIME_MATERIALIZE(()) AS pt]) - 
SinkConversionToRow (4/6) (9ee0383d676a190b0a62d206039db26c) switched from 
RUNNING to FAILED.
java.io.IOException: Failed to deserialize CSV row.
at 
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299)
at 
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210)
at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Caused by: java.lang.RuntimeException: Row length mismatch. 4 fields expected 
but was 3.
at 
org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441)
at 
org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244)
at 
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293)
... 5 more