Re: Kinesis Sink - Data being received with intermittent breaks
Hi Zain, I'm looping in Danny here, he is probably the most knowledgeable when it comes to the Kinesis connector. Best, Alexander On Mon, May 16, 2022 at 12:13 AM Zain Haider Nemati wrote: > Hi, > Im fetching data from kafka topics converting them to chunks of <= 1MB and > sinking them to a kinesis data stream. > The streaming job is functional however I see bursts of data in kinesis > stream with intermittent dips where data received is 0. I'm attaching the > configuration parameters for kinesis sink. What could be the cause of this > issue? > The data is being fed into datastream by a kafka topic which is being fed > in by a mongodb and has about 60 million records which are loaded fully. > I am trying to configure parameters in such a way that the 1MB per data > payload limit of kinesis is not breached. Would appreciate help on this ! > > producerConfig.put(AWSConfigConstants.AWS_REGION, “xxx”); > > producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, “xxx"); > > producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, “xxx”); > producerConfig.put(“AggregationMaxCount”, “3”); > producerConfig.put(“AggregationMaxSize”, “256”); > producerConfig.put(“CollectionMaxCount”, “3”); > producerConfig.put(“CollectionMaxSize”, “10”); > producerConfig.put(“AggregationEnabled”, true); > producerConfig.put(“RateLimit”, “50"); > producerConfig.put(“RecordMaxBufferedTime”, “1000"); > producerConfig.put(“ThreadingModel”, “POOLED”); > FlinkKinesisProducer kinesis = new > FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig); > kinesis.setFailOnError(false); > kinesis.setDefaultStream(“xxx”); > kinesis.setDefaultPartition(“0"); > kinesis.setQueueLimit(1000); > > *Data in Kinesis :* > [image: image.png] > -- Alexander Preuß | Engineer - Data Intensive Systems alexanderpre...@ververica.com <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Alexander Walden, Karl Anton Wehner, Yip Park Tung Jason, Jinwei (Kevin) Zhang
Re: Split string flatmap -> Arraylist/List
Hi Zain, There are different options to achieve this. You can take a look at the WordCount example [1] to get some inspiration. [1] https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java Best regards, Alexander On Fri, May 13, 2022 at 9:30 AM Zain Haider Nemati wrote: > Hi, > I have a comma separated string of the format > x,y,z \n > a,b,c ... > > I want to split the string on the basis of '\n' and insert each string > into the data stream separately using *flatmap*. Any example on how to do > that? If I add the chunks into an Arralist or List and return that from the > flatmap function would that work? > -- Alexander Preuß | Engineer - Data Intensive Systems alexanderpre...@ververica.com <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason, Jinwei (Kevin) Zhang
Re: http stream as input data source
Hi Harald, I was previously investigating this topic as well. There are some community efforts for HTTP sources, please have a look at the references below: https://getindata.com/blog/data-enrichment-flink-sql-http-connector-flink-sql-part-one/ https://github.com/getindata/flink-http-connector https://github.com/galgus/flink-connector-http Best regards, Alexander On Thu, May 12, 2022 at 1:59 PM Xuyang wrote: > Hi, there have not been a http source currently, and you can build the > custom data source manually just like Yuxia said. > > Yuxia has given you a quick way to build the custom connector by Table > Api. But if you want to use DataStream api to do that, you can refer to > here[1]. > > You can also open an issue to start a discussion in Flink community > here[2] to let community support this feature officially. > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/#use-the-source > [2] https://issues.apache.org/jira/projects/FLINK/issues/ > > At 2022-05-12 11:37:29, "Harald Busch" wrote: > > Hi, > is there a http data stream as data source ? > I only see socketTextStream and other predefined stream sources. > It seems that I have to use fromCollection, fromElements ... and prepare > the collection for myself. > Thanks > Regards > > -- Alexander Preuß | Engineer - Data Intensive Systems alexanderpre...@ververica.com <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason, Jinwei (Kevin) Zhang
Re: At-least once sinks and their behaviour in a non-failure scenario
Hi Piotr, You are correct regarding the Savepoint, there should be no duplicates sent to RabbitMQ. Best regards, Alexander On Thu, May 12, 2022 at 11:28 AM Piotr Domagalski wrote: > Hi, > > I'm planning to build a pipeline that is using Kafka source, some stateful > transformation and a RabbitMQ sink. What I don't yet fully understand is > how common should I expect the "at-least once" scenario (ie. seeing > duplicates) on the sink side. The case when things start failing is clear > to me, but what happens when I want to gracefully stop the Flink job? > > Am I right in thinking that when I gracefully stop a job with a final > savepoint [1] then what happens is that Kafka source stops consuming, a > checkpoint barrier is sent through the pipeline and this will flush the > sink completely? So my understanding is that if nothing fails and that > Kafka offset is committed, when the job is started again from that > savepoint, it will not result in any duplicates being sent to RabbitMQ. Is > that correct? > > Thanks! > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint > > -- > Piotr Domagalski > -- Alexander Preuß | Engineer - Data Intensive Systems alexanderpre...@ververica.com <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason, Jinwei (Kevin) Zhang
Re: Exception Handling in ElasticsearchSink
Hi Rion, Sorry for the late reply. There should be no problems instantiating the metric in the open() function and passing down its reference through createSink and buildSinkFromRoute. I'd be happy to help in case you encounter any issues. Best, Alexander On Thu, Apr 21, 2022 at 10:49 PM Rion Williams wrote: > Hi all, > > I've recently been encountering some issues that I've noticed in the logs > of my Flink job that handles writing to an Elasticsearch index. I was > hoping to leverage some of the metrics that Flink exposes (or piggyback on > them) to update metric counters when I encounter specific kinds of errors. > > val builder = ElasticsearchSink.Builder(...) > > builder.setFailureHandler { actionRequest, throwable, _, _ -> > // Log error here (and update metrics via metricGroup.counter(...) > } > > return builder.build() > > Is there a way to handle this currently? My specific implementation has a > process function that manages multiple sinks (so I can create these > dynamically), but in the case of these errors, it doesn't look like I can > access the metric group within the setFailureHandler at present. > > My initial thought was in my parent process function, I could pass in the > context to the child sinks so that I'd have context for the > exceptions/metrics: > > class DynamicElasticsearchSink ElasticsearchSinkBase>( > /** > * Defines a router that maps an element to its corresponding > ElasticsearchSink instance > * @param sinkRouter A [ElasticSinkRouter] that takes an element of type > [ElementT], a string-based route > * defined as [RouteT] which is used for caching sinks, and finally the > sink itself as [ElasticsearchSink] > */ > private val sinkRouter: ElasticsearchSinkRouter > ) : RichSinkFunction(), CheckpointedFunction { > > // Store a reference to all of the current routes > private val sinkRoutes: MutableMap = ConcurrentHashMap() > private lateinit var configuration: Configuration > > override fun open(parameters: Configuration) { > configuration = parameters > } > > override fun invoke(value: ElementT, context: SinkFunction.Context) { > val route = sinkRouter.getRoute(value) > var sink = sinkRoutes[route] > if (sink == null) { > // Here's where the sink is constructed when an exception occurs > sink = sinkRouter.createSink(route, value) > sink.runtimeContext = runtimeContext > sink.open(configuration) > sinkRoutes[route] = sink > } > > sink.invoke(value, context) > } > } > > I'd imagine within the open call for this function, I could store the > metrics group and pass it into my createSink() call so the child sinks > would have a reference to it. Does that seem feasible or is there another > way to handle this? > > Thanks all, > > Rion > > -- Alexander Preuß | Engineer - Data Intensive Systems alexanderpre...@ververica.com <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason, Jinwei (Kevin) Zhang
Re: Reactive mode and checkpointing
Hello, There are no scheduler-specific options for checkpointing. You can however set `execution.checkpointing.tolerable-failed-checkpoints` to 0 to forbid checkpoint failures ( https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-checkpointing-tolerable-failed-checkpoints ). Best regards, Alexander On Tue, Apr 12, 2022 at 6:43 AM aryan m wrote: > Hello ! > > Are there options in reactive mode to prevent a job from restarting if > the last checkpoint failed or timed out due to any reason ? > > > Thanks, > AR > > -- Alexander Preuß | Engineer - Data Intensive Systems alexanderpre...@ververica.com <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason, Jinwei (Kevin) Zhang
Re: FileSystem format
Hi Ian, Unfortunately configuring the naming is only possible when using the FileSystem connector from DataStream. If this would be an option for you the configuration is explained here: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#part-file-configuration Best, Alexander On Wed, Mar 23, 2022 at 4:06 AM lan tran wrote: > Hi team, > So basically, when I use Flink Table API to generate the files and store > in S3. The format files will be like this > part-0d373ee1-d594-40b1-a3cf-8fa895260980-0-0. So my question is is there > any way that we can config this files names (by adding the > last_modified_value) to this files name ? > > Best, > Quynh > > > > > > Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for > Windows > > > -- Alexander Preuß | Junior Engineer - Data Intensive Systems alexanderpre...@ververica.com <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason, Jinwei (Kevin) Zhang
Re: Flink failure rate restart not work as expect
Hi, from a first glance it looks like the exception was thrown very rapidly so it exceeded the maxFailuresPerInterval and the FailureRestartStrategy decided not to restart. Why do you think this is different from the expected behavior? Best, Alex On Tue, Mar 1, 2022 at 3:23 AM 刘 家锹 wrote: > Hi, all > We encounter some problem with FailureRateRestartStrategy, which confuse > us and don't know how to solove it. Here's the situation: > > Flink version: 1.10.1 > Development env: on Yarn > > FailureRateRestartStrategy: > failuresIntervalMS=6,backoffTimeMS=15000,maxFailuresPerInterval=4 > > One of our hadoop machine got stuck without response, which our job's > taskmanager running on. At this moment, the jobmanager receive a heartbeat > timeout exception, but after throwing 4 times exception in a very short > time(about 10ms each), it hit the FailureRateRestartStrategy and all job > quit, we got the message of 'org.apache.flink.runtime.JobException: > Recovery is suppressed by FailureRateRestartBackoffTimeStrategy'. > As I know from document, the behavior expected was jobmanager should try > to restart the job which will bring up a new taskmanager on other machine, > but it did not. > We also do some test, start a new job and just kill the taskamanger, but > it can restart as expect. > > So it confuse us most, if anyone know what happen, that would be thanks. > > JobManager log and TaskManager log append below > -- Alexander Preuß | Junior Engineer - Data Intensive Systems alexanderpre...@ververica.com <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason, Jinwei (Kevin) Zhang
Re: [state.checkpoints.num-retained ]The default value does not take effect
Hi Chenqizhu, If you believe what you have found is a bug, please file an issue of type bug in the ASFJira (https://issues.apache.org/jira/projects/FLINK/issues) and select 1.13.3 as the affected version. Best regards, Alexander On Fri, Dec 17, 2021 at 8:56 AM chenqizhu wrote: > hi, > >The configration is valid only when I specify -Dstate.cache. > num-retained=n by command line interface. >If I do not specify this configuration, the default value does not take > effect , is it a bug ? > >my flink version : flink-1.13.3 > > > > -- Alexander Preuß | Junior Engineer - Data Intensive Systems alexanderpre...@ververica.com <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason, Jinwei (Kevin) Zhang
Re: Information request: Reactive mode and Rescaling
Hi Morgan, Regarding your first question, if the Kafka connector is configured to use exactly-once semantics it will check the offsets of partitions when recovering from the checkpoint, so there will be no data loss or duplication. I'm not quite sure I understood the second part of the first question regarding the HPA, are you asking if Flink is actively triggering a new checkpoint before the pod shutdown? If so then the answer is no, it will use the latest completed checkpoint. For the second question, it will use the latest checkpoint, there is no 'active'/forced creation of a new checkpoint before the re-scaling. You can find some more information about the mechanism here: https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/ Best regards, Alexander On Wed, Dec 15, 2021 at 10:24 PM Geldenhuys, Morgan Karl < morgan.geldenh...@tu-berlin.de> wrote: > Greetings, > > > I would like to find out more about Flink's new reactive mode as well as > the rescaling feature regarding fault tolerance. For the following question > lets assume checkpointing is enabled using HDFS. > > > So first question, if I have a job where the source(s) and sink(s) are > configured to use Kafka with exactly-once processing enabled, how does > reactive mode handle this? On reconfigurations using the Horizontal Pod > Autoscaler, does it recover to the latest checkpoint? > > > Second question, for re-scaling, does it automatically create a > savepoint and then rescale or does it use the latest checkpoint to ensure > results are consistent during reconfiguration. > > > Thank you in advance! > > > M. > -- Alexander Preuß | Junior Engineer - Data Intensive Systems alexanderpre...@ververica.com <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason, Jinwei (Kevin) Zhang
Re: pyFlink + asyncio
Hi Михаил, >From looking at https://nightlies.apache.org/flink/flink-docs-master/api/python//pyflink.datastream.html there is currently no AsyncFunction / RichAsyncFunction implementation in pyFlink, so you are bound to synchronously interacting. Best regards, Alexander On Thu, Dec 16, 2021 at 12:47 PM Королькевич Михаил wrote: > Hi team! > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/ > Is it possible to use this for pyFlink? > Or another asynchronous enrichment of an unordered data stream? > -- Alexander Preuß | Junior Engineer - Data Intensive Systems alexanderpre...@ververica.com <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason, Jinwei (Kevin) Zhang
Re: custom metrics in elasticsearch ActionRequestFailureHandler
Hi Lars, What is your use case for the failure handler, just collecting metrics? We want to remove the configurable failure handler in the new Sink API implementation of the Elasticsearch connector in Flink 1.15 because it can be a huge footgun with regards to delivery guarantees. Best Regards, Alexander On Thu, Dec 2, 2021 at 6:23 PM Lars Bachmann wrote: > Hi David, > > Thanks for the reply. I think especially in an error/failure handler > metrics are important in order to have proper monitoring/alerting in such > cases. Would be awesome if this could be added to Flink at some point :). > > Regards, > > Lars > > Am 02.12.2021 um 18:13 schrieb David Morávek : > > Hi Lars, > > quickly looking at the ES connector code, I think you're right and there > is no way to do that :( In general I'd say that being able to expose > metrics is a valid request. > > I can imagine having some kind of `RichActionRequestFailureHandler` with > `{get|set}RuntimeContext` methods. More or less the same thing we already > do with for example the `RichFunction`. This unfortunately requires some > work on the Flink side. > > cc @Arvid > > On Thu, Dec 2, 2021 at 5:52 PM wrote: > >> Hi, >> >> is there a way to expose custom metrics within an elasticsearch failure >> handler (ActionRequestFailureHandler)? To register custom metrics I need >> access to the runtime context but I don't see a way to access the >> context in the failure handler. >> >> Thanks and regards, >> >> Lars >> > >
Re: how to run streaming process after batch process is completed?
Hi Vtygoss, Can you explain a bit more about your ideal pipeline? Is the batch data bounded data or could you also process it in streaming execution mode? And is the streaming data derived from the batch data or do you just want to ensure that the batch has been finished before running the processing of the streaming data? Best Regards, Alexander (sending again because I accidentally left out the user ml in the reply on the first try) On Tue, Nov 30, 2021 at 12:38 PM vtygoss wrote: > Hi, community! > > > By Flink, I want to unify batch process and streaming process in data > production pipeline. Batch process is used to process inventory data, then > streaming process is used to process incremental data. But I meet a > problem, there is no state in batch and the result is error if i run > stream process directly. > > > So how to run streaming process accurately after batch process is > completed? Is there any doc or demo to handle this scenario? > > > Thanks for your any reply or suggestion! > > > Best Regards! > > > > >
Re: Could not retrieve submitted JobGraph from state handle
Hi Alexey, Are you maybe reusing the cluster-id? Also, could you provide some more information on your setup and a more complete stacktrace? The ConfigMap contains pointers to the actual files on Azure. Best, Alexander On Tue, Nov 16, 2021 at 6:14 AM Alexey Trenikhun wrote: > Hello, > We are using Kubernetes HA and Azure Blob storage and in rare cases I see > following error: > > Could not retrieve submitted JobGraph from state handle under > jobGraph-. This indicates that the > retrieved state handle is broken. Try cleaning the state handle store. > > Question is, how exactly can I clean "state handle store"? Delete > fsp-dispatcher-leader Config Map? Or some files (which one) in Azure Blob > storage? > > Thanks, > Alexey >