Re: REST API in an HA setup - must the leading JM be called?
Thank you, answers my questions. -- Regards, Juha On Wed, Aug 18, 2021 at 2:28 PM Chesnay Schepler wrote: > You've pretty much answered the question yourself. *thumbs up* > > For the vast majority of cases you can call any JobManager. > The exceptions are jar operations (because they are persisted in the > JM-local filesystem, and other JMs don't know about them) and triggering > savepoints (because metadata for on-going savepoint operations (i.e., the > information returned when querying the savepoint operation status) is also > kept locally in the JM). > > This does indeed imply that on JM failover all this information is lost. > > There are ideas to solve is, but no concrete timeline. See > https://issues.apache.org/jira/browse/FLINK-18312 > > On 18/08/2021 11:54, Juha Mynttinen wrote: > > I have questions related to REST API in the case of ZooKeeper HA and a > standalone cluster. But I think the questions apply to other setups too > such as YARN. > > Let's assume a standalone cluster with multiple JobManagers. The > JobManagers elect the leader among themselves and register that to > ZooKeeper. When using the Flink command line, AFAIK the code will go to > ZooKeeper to find the host and port of the leading JobManager and send HTTP > requests there. > > My question is: when accessing the REST API directly (e.g. curl) does one > need to call the leading JobManager or will any up and running JobManager > do? And if the leader needs to be called, why is it so? > > Behind the scenes the REST API will connect to the leading "JobManager" > over RPC, making it irrelevant which JobManager receives the HTTP request. > > By experimenting, I found the Web UI works fine if all the JobManagers are > behind a load balancer and leading and standby JobManagers are called. The > only issue I found was that when a jar is submitted (/jars/upload), it is > stored on the local disk of the JobManager that happens to handle that > request. As a consequence, creating a job from that jar only succeeds if > the HTTP request hits the JobManager that has the file. There might be a > "hack" to overcome this limitation, set web.upload.dir to be in S3 / GCS or > elsewhere accessible by all JobManagers. I didn't try this. Or in the case > of uploading jars and creating jobs, ensure the same JobManager is called > (bypass loadbalancer). > > But I wonder if there's something else why the leading JM should be called. > > A follow-up question arises. If the jars are stored only on the leading > JobManager, doesn't that mean that if the leader changes, the new leader is > not aware of the jars uploaded to the old leader? From the REST > API's perspective this means that even in the JobManager HA setup and when > always calling the leader, a simple "upload a jar and a deploy a job"-cycle > is not guaranteed to work if the leader happens to change between the > requests. Did I miss something? > > -- > Regards, > Juha > > >
failures during job start
Any help with this would be appreciated. Is it possible that this is a data/application issue or a flink config/resource issue? Using flink 11.2, java 11, session cluster, 5 nodes 32 cores each node. I have an issue where starting a job takes a long time, and sometimes fails with PartitionNotFoundException, but succeeds on restart. The job has 10 kafka sources (10 partitions for each topic) and parallelism 5. The failure does not happen when the kafka logs are empty. Note during below scenario, cpu usage on task manager and job managers is low (below 30%) The scenario we see * run request to load and run a jar, job appears on dashboard with all 160 subtasks in Deploying state * after 2 minutes some subtasks start transitioning to running. * after another 30 seconds failure occurs and job goes into Restarting state * after another minute, restart completes all nodes running. Exception history shows 2021-08-15 07:55:02 org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: Partition 205a0867c6ef540009acd962d556f981#0@a6b547c5096f3c33eb9059cfe767a2ec not found. at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:267) at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:166) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:521) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:765) at java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Pre shuffle aggregation in flink is not working
I am trying to do pre shuffle aggregation in flink. Following is the MapBundle implementation. *public class TaxiFareMapBundleFunction extends MapBundleFunction {@Overridepublic TaxiFare addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception { if (value == null) {return input;}value.tip = value.tip + input.tip;return value;}@Overridepublic void finishBundle(Map buffer, Collector out) throws Exception {for (Map.Entry entry : buffer.entrySet()) {out.collect(entry.getValue());} }}* I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation is not working as the "*count*" variable is always 0. Please let me know If I am missing something. *@Overridepublic void onElement(T element) throws Exception { count++;if (count >= maxCount) { callback.finishBundle();reset();}}* Here is the main code. *MapBundleFunction mapBundleFunction = new TaxiFareMapBundleFunction(); BundleTrigger bundleTrigger = new CountBundleTrigger<>(10); KeySelector taxiFareLongKeySelector = new KeySelector() {@Overridepublic Long getKey(TaxiFare value) throws Exception {return value.driverId;}};DataStream> hourlyTips =//fares.keyBy((TaxiFare fare) -> fare.driverId)// .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new AddTips());;fares.transform("preshuffle", TypeInformation.of(TaxiFare.class),new TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector )).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(20)) { @Overridepublic long extractTimestamp(TaxiFare element) {return element.startTime.getEpochSecond();} }).keyBy((TaxiFare fare) -> fare.driverId) .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) .process(new AddTips());DataStream> hourlyMax = hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);* Thanks Suman
Timer Service vs Custom Triggers
My use case is that I'm producing a set of measurements by key every 60 seconds. Currently, this is handled with the usual pattern of keyBy().window(Tumbling...(60)).process(...) I need to provide the same output, but as a result of a timeout. The data needed for the timeout summary will be in the global state for that key. This seems possible by either using the timer service in the process function without a window (e.g. keyBy(..).process(..)) or by using a customer trigger. Why choose one or the other? -- Thanks, Aeden GitHub: https://github.com/aedenj Linked In: http://www.linkedin.com/in/aedenjameson
Error while deserializing the element
Setup Specifics: Version: 1.6.2 RocksDB Map State Timers stored in rocksdb When we have this job running for long periods of time like > 30 days, if for some reason the job restarts, we encounter "Error while deserializing the element". Is this a known issue fixed in later versions? I see some changes to code for FLINK-10175, but we don't use any queryable state Below is the stack trace org.apache.flink.util.FlinkRuntimeException: Error while deserializing the element. at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85) at org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) at org.apache.flink.types.StringValue.readString(StringValue.java:769) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:179) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:46) at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:168) at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:387) ... 20 more -- Thanks, -Vijay
Re: Periodic output at end of stream
Hey JING, thanks for getting back to me. I tried to produce the smallest, self-contained example that produces the phenomenon: https://gist.github.com/mbroecheler/fd27dd8a810b038ec463cdd5339d290f If you run MainRepl you should see an infinite loop of re-processing the 5 integers. The offending process is BufferedLatestSelector - specifically the event timer that is registered in it. Without the timer the process will not emit an output. The timer is set whenever the state is null. Is there a problem with how I implemented that buffering process? Thank you, Matthias On Sun, Aug 15, 2021 at 8:59 PM JING ZHANG wrote: > Hi Matthias, > How often do you register the event-time timer? > It is registered per input record, or re-registered a new timer after an > event-time timer is triggered? > Would you please provide your test case code, it would be very helpful for > troubleshooting. > > Best wishes, > JING ZHANG > > Matthias Broecheler 于2021年8月14日周六 上午3:44写道: > >> Hey guys, >> >> I have a KeyedProcessFunction that gathers statistics on the events that >> flow through and emits it periodically (every few seconds) to a SideOutput. >> However, at the end of stream the last set of statistics don't get >> emitted. I read on the mailing list that processing time timers that are >> pending don't get triggered when Flink cleans up a stream, but that event >> timers do get triggered because a watermark with Long.MAX_VALUE is sent >> through the stream. >> Hence, I thought that I could register a "backup" event timer for >> Long.MAX_VALUE-1 to make sure that my process function gets notified when >> the stream ends to emit the in-flight statistics. >> >> However, now my simple test case (with a data source fromCollection of 4 >> elements) keeps iterating over the same 4 elements in an infinite loop. >> >> I don't know how to make sense of this and would appreciate your help. >> Is there a better way to set a timer that gets triggered at the end of >> stream? >> And for my education: Why does registering an event timer cause an >> infinite loop over the source elements? >> >> Thanks a lot and have a wonderful weekend, >> Matthias >> >
Setting S3 parameters in a K8 jobmanager deployment
I have a kubernetes jobmanager deployment that requires parameters be passed as command line rather than retrieving values from the flink-config map. Is there a way to do this? apiVersion: apps/v1 kind: Deployment metadata: name: flink-jobmanager spec: replicas: 1 # Set the value to greater than 1 to start standby JobManagers selector: matchLabels: app: flink component: jobmanager template: metadata: labels: app: flink component: jobmanager spec: containers: - name: jobmanager image: apache/flink:1.13.0-scala_2.11 args: ["jobmanager", "-Ds3.endpoint=https://192.173.0.0:9000;, "-Ds3.access-key=key", "Ds3.secret-key=secret"] ports Robert Cullen 240-475-4490
Re: Validating Flink SQL without registering with StreamTableEnvironment
Hi Yuval, I can expand a bit more on the technical side of validation, though as a heads-up, I don't have a solution. When validating entire pipelines on a logical level, you run into the (maybe obvious) issue, that statements depend on previous statements. In the simple case of a CREATE TABLE DDL followed by some query, ("full") validation of the query depends on the table actually existing. On the other hand, validating a CREATE TABLE DDL shouldn't actually execute that DDL, creating a conflict. Of course this is only a concern if during validation we care about the table existing, but from the perspective of syntax this wouldn't matter. However, Flink's parser (ParserImpl) under the hood calls SqlToOperationConverter, which in some places does table lookups etc., so it depends on the catalog manager. This prevents us from doing this kind of validation. Ideally, SqlToOperationConverter would not have such a dependency, but it takes some work to change that as operations would have to be redesigned and "evaluated" later on. I think, as of now, you'd have to actually use the CalciteParser directly to bypass this call, but of course this is not accessible (non-reflectively). I've also never tried this, so I don't know whether it would actually work. It'd definitely be missing the ability to parse anything handled in Flink's "extended parser" right now, but that is mostly concerning SQL-client-specific syntax. Best Ingo On Wed, Aug 18, 2021 at 2:41 PM Yuval Itzchakov wrote: > Thanks Ingo! > I just discovered this a short while before you posted :) > > Ideally, I'd like to validate that the entire pipeline is set up > correctly. The problem is that I can't use methods like `tableEnv.sqlQuery` > from multiple threads, and this is really limiting my ability to speed up > the process (today it takes over an hour to complete, which isn't > reasonable). > > If anyone has any suggestions on how I can still leverage the > TableEnvironment in the processor to validate my SQL queries I'd be happy > to know. > > On Wed, Aug 18, 2021 at 2:37 PM Ingo Bürk wrote: > >> Hi Yuval, >> >> if syntactical correctness is all you care about, parsing the SQL should >> suffice. You can get a hold of the parser from >> TableEnvironmentImpl#getParser and then run #parse. This will require you >> to cast your table environment to the (internal) implementation, but maybe >> this works for you? >> >> >> Best >> Ingo >> >> On Wed, Aug 18, 2021 at 12:51 PM Yuval Itzchakov >> wrote: >> >>> Hi, >>> >>> I have a use-case where I need to validate hundreds of Flink SQL >>> queries. Ideally, I'd like to run these validations in parallel. But, given >>> that there's an issue with Calcite and the use of thread-local storage, I >>> can only interact with the table runtime via a single thread. >>> >>> Ideally, I don't really care about the overall registration process of >>> sources, transformations and sinks, I just want to make sure the syntax is >>> correct from Flinks perspective. >>> >>> Is there any straightforward way of doing this? >>> >>> -- >>> Best Regards, >>> Yuval Itzchakov. >>> >> > > -- > Best Regards, > Yuval Itzchakov. >
Process suspend when get Hana connection in open method of sink function
Dear all: I have a problem when I want to sink data to Hana database. Process is suspended when get Hana connection in the open method of sink function as below. My flink version is 1.10. public class HrrmPayValueSumToHana extends RichSinkFunction { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = HrrmUtils.getHanaConnection();// process is suspended here } @Override public void invoke() { ... } @Override public void close() throws Exception { .. } } public static Connection getHanaConnection() { Connection con = null; try { Class.forName(HrrmConstants.HANA_DRIVER_CLASS); con = DriverManager.getConnection(HrrmConstants.HANA_SOURCE_DRIVER_URL, HrrmConstants.HANA_SOURCE_USER, HrrmConstants.HANA_SOURCE_PASSWORD); } catch (Exception e) { LOG.error("---hana get connection has exception , msg = ", e); } return con; } Hana driver dependency as below: com.sap.cloud.db.jdbc ngdbc 2.3.62
Process suspend when get Hana connection in open method of sink function
Dear all: I have a problem when I want to sink data to Hana database. Process is suspended when get Hana connection in the open method of sink function as below. My flink version is 1.10. public class HrrmPayValueSumToHana extends RichSinkFunction { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = HrrmUtils.getHanaConnection();// process is suspended here } @Override public void invoke() { ... } @Override public void close() throws Exception { .. } } public static Connection getHanaConnection() { Connection con = null; try { Class.forName(HrrmConstants.HANA_DRIVER_CLASS); con = DriverManager.getConnection(HrrmConstants.HANA_SOURCE_DRIVER_URL, HrrmConstants.HANA_SOURCE_USER, HrrmConstants.HANA_SOURCE_PASSWORD); } catch (Exception e) { LOG.error("---hana get connection has exception , msg = ", e); } return con; } Hana driver dependency as below: com.sap.cloud.db.jdbc ngdbc 2.3.62
Re: flink Kinesis Consumer Connected but not consuming
Hey Tarun, Your application looks ok and should work. I did notice this, however I cannot imagine it is an issue, unless you are not setting the region correctly: - getKafkaConsumerProperties() Make sure you are setting the correct region (AWSConfigConstants.AWS_REGION) in the properties. If this is ok, please check Flink dashboard to ensure the following metrics are flowing for this operator: - millisBehindLatest - numberOfAggregatedRecords - numberOfDeaggregatedRecords Thanks, On Wed, Aug 18, 2021 at 2:33 AM tarun joshi <1985.ta...@gmail.com> wrote: > Hey All, > > I am running flink in docker containers (image Tag > :flink:scala_2.11-java11) on EC2. > > I am able to connect to a Kinesis Connector but nothing is being consumed. > > My command to start Jobmanager and TaskManager : > > > > > > > > > > *docker run \--rm \--volume /root/:/root/ \--env > JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \--env > TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" > \--env > ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar" > \--name=jobmanager \--network flink-network \--publish 8081:8081 > \flink:scala_2.11-java11 jobmanager &* > > > > > > > > > > *docker run \--rm \--env > JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \--env > TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" > \--env > ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar" > \--name=taskmanager_0 \--network flink-network \flink:scala_2.11-java11 > taskmanager &* > > 2021-08-17 22:38:01,106 INFO > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - > Subtask 0 will be seeded with initial shard StreamShardHandle{streamName=' Stream Name>', shard='{ShardId: shardId-,HashKeyRange: > {StartingHashKey: 0,EndingHashKey: 34028236692093846346337460743176821144} > ,SequenceNumberRange: {StartingSequenceNumber: > 49600280467722672235426674687631661510244124728928239618,}}'}, starting > state set as sequence number LATEST_SEQUENCE_NUM > > &&& this for each shard Consumer > > 2021-08-17 22:38:01,107 INFO > org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher > [] - Subtask 0 will start consuming seeded shard > StreamShardHandle{streamName='web-clickstream', shard='{ShardId: > shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey: > 34028236692093846346337460743176821144},SequenceNumberRange: > {StartingSequenceNumber: > 49600280467722672235426674687631661510244124728928239618,}}'} from > sequence number LATEST_SEQUENCE_NUM with ShardConsumer 0 > > my program is simple to test out a DataStream from Kinesis > > FlinkKinesisConsumer kinesisConsumer = > new FlinkKinesisConsumer<>( > "", new SimpleStringSchema(), > getKafkaConsumerProperties()); > env.addSource(kinesisConsumer).print(); > > env.execute("Read files in streaming fashion"); > > Other Facts: > > >1. I can see data being flowing into our kinesis stream from the >Monitoring Tab of AWS continuously. >2. I was facing issues with Authorization of accessing the Kinesis in >our AWS infra, but I resolved that by moving in the same security group of >Kinesis deployment and creating a role with full access to Kinesis. > > > Any pointers are really appreciated! > > Thanks, > Tarun >
固定间隔重启策略 - 计数逻辑
Hi 大家好: 我想问一下当重启策略为:restart-strategy: fixed-delay 时,其参数 restart-strategy.fixed-delay.attempts 是全局计数(任务生命周期)?还是每次当任务从HA失败策略中恢复后,会重置重试次数,下次失败重新从0开始?
Re: Validating Flink SQL without registering with StreamTableEnvironment
Thanks Ingo! I just discovered this a short while before you posted :) Ideally, I'd like to validate that the entire pipeline is set up correctly. The problem is that I can't use methods like `tableEnv.sqlQuery` from multiple threads, and this is really limiting my ability to speed up the process (today it takes over an hour to complete, which isn't reasonable). If anyone has any suggestions on how I can still leverage the TableEnvironment in the processor to validate my SQL queries I'd be happy to know. On Wed, Aug 18, 2021 at 2:37 PM Ingo Bürk wrote: > Hi Yuval, > > if syntactical correctness is all you care about, parsing the SQL should > suffice. You can get a hold of the parser from > TableEnvironmentImpl#getParser and then run #parse. This will require you > to cast your table environment to the (internal) implementation, but maybe > this works for you? > > > Best > Ingo > > On Wed, Aug 18, 2021 at 12:51 PM Yuval Itzchakov > wrote: > >> Hi, >> >> I have a use-case where I need to validate hundreds of Flink SQL queries. >> Ideally, I'd like to run these validations in parallel. But, given that >> there's an issue with Calcite and the use of thread-local storage, I can >> only interact with the table runtime via a single thread. >> >> Ideally, I don't really care about the overall registration process of >> sources, transformations and sinks, I just want to make sure the syntax is >> correct from Flinks perspective. >> >> Is there any straightforward way of doing this? >> >> -- >> Best Regards, >> Yuval Itzchakov. >> > -- Best Regards, Yuval Itzchakov.
Re: flink 1.13.1版本,使用hive方言,执行insert overwirite语句,插入数据为空时,没有将表中原数据清空
你好, 这个可以去开个jira跟踪一下 On Tue, Aug 17, 2021 at 2:47 PM Asahi Lee <978466...@qq.com.invalid> wrote: > hi! > > 我使用如下sql,我select查询的数据为0行记录时,运行结束后,插入表的原数据没有被清空;而我在hive客户端执行时,表是被清空的! > INSERT OVERWRITE target_table SELECT * from source_table where id 10; -- Best regards! Rui Li
Re: Validating Flink SQL without registering with StreamTableEnvironment
Hi Yuval, if syntactical correctness is all you care about, parsing the SQL should suffice. You can get a hold of the parser from TableEnvironmentImpl#getParser and then run #parse. This will require you to cast your table environment to the (internal) implementation, but maybe this works for you? Best Ingo On Wed, Aug 18, 2021 at 12:51 PM Yuval Itzchakov wrote: > Hi, > > I have a use-case where I need to validate hundreds of Flink SQL queries. > Ideally, I'd like to run these validations in parallel. But, given that > there's an issue with Calcite and the use of thread-local storage, I can > only interact with the table runtime via a single thread. > > Ideally, I don't really care about the overall registration process of > sources, transformations and sinks, I just want to make sure the syntax is > correct from Flinks perspective. > > Is there any straightforward way of doing this? > > -- > Best Regards, > Yuval Itzchakov. >
Re: REST API in an HA setup - must the leading JM be called?
You've pretty much answered the question yourself. *thumbs up* For the vast majority of cases you can call any JobManager. The exceptions are jar operations (because they are persisted in the JM-local filesystem, and other JMs don't know about them) and triggering savepoints (because metadata for on-going savepoint operations (i.e., the information returned when querying the savepoint operation status) is also kept locally in the JM). This does indeed imply that on JM failover all this information is lost. There are ideas to solve is, but no concrete timeline. See https://issues.apache.org/jira/browse/FLINK-18312 On 18/08/2021 11:54, Juha Mynttinen wrote: I have questions related to REST API in the case of ZooKeeper HA and a standalone cluster. But I think the questions apply to other setups too such as YARN. Let's assume a standalone cluster with multiple JobManagers. The JobManagers elect the leader among themselves and register that to ZooKeeper. When using the Flink command line, AFAIK the code will go to ZooKeeper to find the host and port of the leading JobManager and send HTTP requests there. My question is: when accessing the REST API directly (e.g. curl) does one need to call the leading JobManager or will any up and running JobManager do? And if the leader needs to be called, why is it so? Behind the scenes the REST API will connect to the leading "JobManager" over RPC, making it irrelevant which JobManager receives the HTTP request. By experimenting, I found the Web UI works fine if all the JobManagers are behind a load balancer and leading and standby JobManagers are called. The only issue I found was that when a jar is submitted (/jars/upload), it is stored on the local disk of the JobManager that happens to handle that request. As a consequence, creating a job from that jar only succeeds if the HTTP request hits the JobManager that has the file. There might be a "hack" to overcome this limitation, set web.upload.dir to be in S3 / GCS or elsewhere accessible by all JobManagers. I didn't try this. Or in the case of uploading jars and creating jobs, ensure the same JobManager is called (bypass loadbalancer). But I wonder if there's something else why the leading JM should be called. A follow-up question arises. If the jars are stored only on the leading JobManager, doesn't that mean that if the leader changes, the new leader is not aware of the jars uploaded to the old leader? From the REST API's perspective this means that even in the JobManager HA setup and when always calling the leader, a simple "upload a jar and a deploy a job"-cycle is not guaranteed to work if the leader happens to change between the requests. Did I miss something? -- Regards, Juha
Validating Flink SQL without registering with StreamTableEnvironment
Hi, I have a use-case where I need to validate hundreds of Flink SQL queries. Ideally, I'd like to run these validations in parallel. But, given that there's an issue with Calcite and the use of thread-local storage, I can only interact with the table runtime via a single thread. Ideally, I don't really care about the overall registration process of sources, transformations and sinks, I just want to make sure the syntax is correct from Flinks perspective. Is there any straightforward way of doing this? -- Best Regards, Yuval Itzchakov.
REST API in an HA setup - must the leading JM be called?
I have questions related to REST API in the case of ZooKeeper HA and a standalone cluster. But I think the questions apply to other setups too such as YARN. Let's assume a standalone cluster with multiple JobManagers. The JobManagers elect the leader among themselves and register that to ZooKeeper. When using the Flink command line, AFAIK the code will go to ZooKeeper to find the host and port of the leading JobManager and send HTTP requests there. My question is: when accessing the REST API directly (e.g. curl) does one need to call the leading JobManager or will any up and running JobManager do? And if the leader needs to be called, why is it so? Behind the scenes the REST API will connect to the leading "JobManager" over RPC, making it irrelevant which JobManager receives the HTTP request. By experimenting, I found the Web UI works fine if all the JobManagers are behind a load balancer and leading and standby JobManagers are called. The only issue I found was that when a jar is submitted (/jars/upload), it is stored on the local disk of the JobManager that happens to handle that request. As a consequence, creating a job from that jar only succeeds if the HTTP request hits the JobManager that has the file. There might be a "hack" to overcome this limitation, set web.upload.dir to be in S3 / GCS or elsewhere accessible by all JobManagers. I didn't try this. Or in the case of uploading jars and creating jobs, ensure the same JobManager is called (bypass loadbalancer). But I wonder if there's something else why the leading JM should be called. A follow-up question arises. If the jars are stored only on the leading JobManager, doesn't that mean that if the leader changes, the new leader is not aware of the jars uploaded to the old leader? From the REST API's perspective this means that even in the JobManager HA setup and when always calling the leader, a simple "upload a jar and a deploy a job"-cycle is not guaranteed to work if the leader happens to change between the requests. Did I miss something? -- Regards, Juha
Re:Re: cumulate函数和比较函数连用报错
哦哦,好的,谢谢你。比较函数不可以连用,简单的一些的条件(where a = '2')可以用,这个功能后续还会调整吗 在 2021-08-18 16:21:20,"Caizhi Weng" 写道: >Hi! > >目前 window tvf 只能应用于 window agg 和 window top-n 两种场景。如果 where 条件是用来对 window >agg 的结果进行过滤的,可以使用 having 而不是 where;若是对进入 window 之前的数据进行过滤的,可以 create view。 > >李航飞 于2021年8月18日周三 下午3:55写道: > >> 通过flinksql建立数据处理通道 >> SELECT window_start,window_end,SUM(price) >> >> FROM TABLE( >> >> CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '2' MINUTES,INTERVAL '10' >> MINUTES)) >> >> GROUP BY window_start,window_end; >> >> 大致语句如上,该语句通过 StreamTableEnvironment对象 env.sqlQuery(sql)执行成功,没有问题 >> 关键一步是 StatementSet对象 sta.execute() 执行报错 >> java.lang.UnsupportedOperationException: >> Currently Flink doesn't support individual window table-valued function >> CUMULATE(time_col=[ts], max_size=[10 min], step=[2 min]). >> Please use window table-valued function with aggregate together using >> window_start and window_end as group keys. >> 执行环境是flink1.13.1 去掉where条件可以正常执行,加上就不行。 >> >>
Re: flinksql的udf中可以使用Operator state的api么?
Hi! SQL 目前并不支持 stateful udf,你可能需要通过 data stream api 来完成这个需求,详见文档 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/process_function/ andrew <15021959...@163.com> 于2021年8月17日周二 下午7:04写道: > hi,你好: > 通过flinksql读kafka数据流,实现监控用户信息基于上一次状态值发生变更触发最新用户信息输出.
Re: cumulate函数和比较函数连用报错
Hi! 目前 window tvf 只能应用于 window agg 和 window top-n 两种场景。如果 where 条件是用来对 window agg 的结果进行过滤的,可以使用 having 而不是 where;若是对进入 window 之前的数据进行过滤的,可以 create view。 李航飞 于2021年8月18日周三 下午3:55写道: > 通过flinksql建立数据处理通道 > SELECT window_start,window_end,SUM(price) > > FROM TABLE( > > CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '2' MINUTES,INTERVAL '10' > MINUTES)) > > GROUP BY window_start,window_end; > > 大致语句如上,该语句通过 StreamTableEnvironment对象 env.sqlQuery(sql)执行成功,没有问题 > 关键一步是 StatementSet对象 sta.execute() 执行报错 > java.lang.UnsupportedOperationException: > Currently Flink doesn't support individual window table-valued function > CUMULATE(time_col=[ts], max_size=[10 min], step=[2 min]). > Please use window table-valued function with aggregate together using > window_start and window_end as group keys. > 执行环境是flink1.13.1 去掉where条件可以正常执行,加上就不行。 > >
cumulate函数和比较函数连用报错
通过flinksql建立数据处理通道 SELECT window_start,window_end,SUM(price) FROM TABLE( CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '2' MINUTES,INTERVAL '10' MINUTES)) GROUP BY window_start,window_end; 大致语句如上,该语句通过 StreamTableEnvironment对象 env.sqlQuery(sql)执行成功,没有问题 关键一步是 StatementSet对象 sta.execute() 执行报错 java.lang.UnsupportedOperationException: Currently Flink doesn't support individual window table-valued function CUMULATE(time_col=[ts], max_size=[10 min], step=[2 min]). Please use window table-valued function with aggregate together using window_start and window_end as group keys. 执行环境是flink1.13.1 去掉where条件可以正常执行,加上就不行。
Re:广告业务中 如何用flink替换spark一些逻辑处理,是否需要用到processfunction
这意思是处理乱序吧,如果重试10次都join不上就放弃的意思? flink下面就是双流interval join的事情吧,然后watermark设置要斟酌一下,如果对延迟不敏感就直接30分钟,如果敏感也可以搞分级重试。 纯猜测。 在 2021-08-18 10:25:49,"张锴" 写道: >需求描述: >需要将目前的spark程序替换成flink去做,在梳理逻辑的时候有一块不知道用flink咋实现,spark是按每三分钟一个批次来跑的。 >描述如下: >广告日志按照ask日志->bid->show->click顺序流程,要求是要将不同的日志都与bid日志merge,来保证bid数据的完整性,key按sessionid+Adid做唯一 >逻辑:spark读取多个日志topic >含xxtopic,格式化,joinAll之后得到(string,pair)日志类型pair.logType如果是'bid'直接写到bidtopic,如果是其他类型,需要从之前HBASE缓存中拿bid表匹配,匹配到(可能是show >or click ..)合并输出到bidtopic, >没有匹配到,会有pair.n来记录次数,并写到xxtopic,n>10次(循环来回30分钟)都没有匹配到bid数据直接写到bidtopic,n<=10次内匹配不到bid >n+1,并写到xxtopic进入下个批次。 >10次是业务方提的,也就是30分钟的缓存,如果没有10次限定,会有很多数据都写到xxtopic,这里不涉及计算,只是合并,也不去重,假如根据key >找到了3条同样的数据,也要合并三条。 > >这个用flink怎么实现?