??????????hive table read
      
blinkStreamTableEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled",
 true);
      Table table=blinkStreamTableEnv.sqlQuery("SELECT * FROM 
test.table_config /*+ 
OPTIONS('streaming-source.enable'='true','streaming-source.monitor-interval' = 
'30 min')*/");


??????????????flink??????
                        // given that the parallelism of the function is 1, we 
can only have 1 or 0 retrieved items.
                        // the 0 is for the case that we are migrating from a 
previous Flink version.
 
                        Preconditions.checkArgument(retrievedStates.size() <= 1,
                                getClass().getSimpleName() + " retrieved 
invalid state.");
 
                        if (retrievedStates.size() == 1 &amp;&amp; 
globalModificationTime != Long.MIN_VALUE) {
                                // this is the case where we have both legacy 
and new state.
                                // The two should be mutually exclusive for the 
operator, thus we throw the exception.
 
                                throw new IllegalArgumentException(
                                        "The " + getClass().getSimpleName() + " 
has already restored from a previous Flink version.");
 
                        } else if (retrievedStates.size() == 1) {
                                this.globalModificationTime = 
retrievedStates.get(0);
                                if (LOG.isDebugEnabled()) {
                                        LOG.debug("{} retrieved a global mod 
time of {}.",
                                                getClass().getSimpleName(), 
globalModificationTime);
                                }
                        }



------------------&nbsp;????????&nbsp;------------------
??????:                                                                         
                                               "Excalibur"                      
                                                              
<972263...@qq.com&gt;;
????????:&nbsp;2020??11??16??(??????) ????10:54
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

????:&nbsp;flink sql hive streaming??????????????????bug



??????1.11.2
??????????java.lang.IllegalArgumentException: 
The&nbsp;ContinuousFileMonitoringFunction&nbsp;has already restored from&nbsp;a 
previous Flink&nbsp;version.
&nbsp; &nbsp; at 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.initializeState(ContinuousFileMonitoringFunction.java:176)
&nbsp; &nbsp; at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
&nbsp; &nbsp; at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
&nbsp; &nbsp; at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
&nbsp; &nbsp; at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
&nbsp; &nbsp; at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
&nbsp; &nbsp; at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
&nbsp; &nbsp; at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
&nbsp; &nbsp; at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
&nbsp; &nbsp; at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
&nbsp; &nbsp; at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
&nbsp; &nbsp; at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
&nbsp; &nbsp; at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
&nbsp; &nbsp; at java.lang.Thread.run(Thread.java:748)

Reply via email to