Re: Kinesis Sink - Data being received with intermittent breaks

2022-05-16 Thread Alexander Preuß
Hi Zain,

I'm looping in Danny here, he is probably the most knowledgeable when it
comes to the Kinesis connector.

Best,
Alexander

On Mon, May 16, 2022 at 12:13 AM Zain Haider Nemati 
wrote:

> Hi,
> Im fetching data from kafka topics converting them to chunks of <= 1MB and
> sinking them to a kinesis data stream.
> The streaming job is functional however I see bursts of data in kinesis
> stream with intermittent dips where data received is 0. I'm attaching the
> configuration parameters for kinesis sink. What could be the cause of this
> issue?
> The data is being fed into datastream by a kafka topic which is being fed
> in by a mongodb and has about 60 million records which are loaded fully.
> I am trying to configure parameters in such a way that the 1MB per data
> payload limit of kinesis is not breached. Would appreciate help on this !
>
> producerConfig.put(AWSConfigConstants.AWS_REGION, “xxx”);
>
> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, “xxx");
>
> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, “xxx”);
> producerConfig.put(“AggregationMaxCount”, “3”);
> producerConfig.put(“AggregationMaxSize”, “256”);
> producerConfig.put(“CollectionMaxCount”, “3”);
> producerConfig.put(“CollectionMaxSize”, “10”);
> producerConfig.put(“AggregationEnabled”, true);
> producerConfig.put(“RateLimit”, “50");
> producerConfig.put(“RecordMaxBufferedTime”, “1000");
> producerConfig.put(“ThreadingModel”, “POOLED”);
> FlinkKinesisProducer kinesis = new
> FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
> kinesis.setFailOnError(false);
> kinesis.setDefaultStream(“xxx”);
> kinesis.setDefaultPartition(“0");
> kinesis.setQueueLimit(1000);
>
> *Data in Kinesis :*
> [image: image.png]
>


-- 

Alexander Preuß | Engineer - Data Intensive Systems

alexanderpre...@ververica.com

<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH

Registered at Amtsgericht Charlottenburg: HRB 158244 B

Managing Directors: Alexander Walden, Karl Anton Wehner, Yip Park Tung
Jason, Jinwei (Kevin) Zhang


Re: Split string flatmap -> Arraylist/List

2022-05-13 Thread Alexander Preuß
Hi Zain,

There are different options to achieve this. You can take a look at the
WordCount example [1] to get some inspiration.

[1]
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java

Best regards,
Alexander

On Fri, May 13, 2022 at 9:30 AM Zain Haider Nemati 
wrote:

> Hi,
> I have a comma separated string of the format
> x,y,z \n
> a,b,c ...
>
> I want to split the string on the basis of '\n' and insert each string
> into the data stream separately using *flatmap*. Any example on how to do
> that? If I add the chunks into an Arralist or List and return that from the
> flatmap function would that work?
>


-- 

Alexander Preuß | Engineer - Data Intensive Systems

alexanderpre...@ververica.com

<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH

Registered at Amtsgericht Charlottenburg: HRB 158244 B

Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason,
Jinwei (Kevin) Zhang


Re: http stream as input data source

2022-05-12 Thread Alexander Preuß
Hi Harald,

I was previously investigating this topic as well. There are some community
efforts for HTTP sources, please have a look at the references below:

https://getindata.com/blog/data-enrichment-flink-sql-http-connector-flink-sql-part-one/
https://github.com/getindata/flink-http-connector
https://github.com/galgus/flink-connector-http

Best regards,
Alexander

On Thu, May 12, 2022 at 1:59 PM Xuyang  wrote:

> Hi, there have not been a http source currently, and you can build the
> custom data source manually just like Yuxia said.
>
> Yuxia has given you a quick way to build the custom connector by Table
> Api. But if you want to use DataStream api to do that, you can refer to
> here[1].
>
> You can also open an issue to start a discussion in Flink community
> here[2] to let community support this feature officially.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/#use-the-source
> [2] https://issues.apache.org/jira/projects/FLINK/issues/
>
> At 2022-05-12 11:37:29, "Harald Busch"  wrote:
>
> Hi,
> is there a http data stream as data source ?
> I only see socketTextStream and other predefined stream sources.
> It seems that I have to use fromCollection, fromElements ... and prepare
> the collection for myself.
> Thanks
> Regards
>
>

-- 

Alexander Preuß | Engineer - Data Intensive Systems

alexanderpre...@ververica.com

<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH

Registered at Amtsgericht Charlottenburg: HRB 158244 B

Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason,
Jinwei (Kevin) Zhang


Re: At-least once sinks and their behaviour in a non-failure scenario

2022-05-12 Thread Alexander Preuß
Hi Piotr,

You are correct regarding the Savepoint, there should be no duplicates sent
to RabbitMQ.

Best regards,
Alexander

On Thu, May 12, 2022 at 11:28 AM Piotr Domagalski 
wrote:

> Hi,
>
> I'm planning to build a pipeline that is using Kafka source, some stateful
> transformation and a RabbitMQ sink. What I don't yet fully understand is
> how common should I expect the "at-least once" scenario (ie. seeing
> duplicates) on the sink side. The case when things start failing is clear
> to me, but what happens when I want to gracefully stop the Flink job?
>
> Am I right in thinking that when I gracefully stop a job with a final
> savepoint [1] then what happens is that Kafka source stops consuming, a
> checkpoint barrier is sent through the pipeline and this will flush the
> sink completely? So my understanding is that if nothing fails and that
> Kafka offset is committed, when the job is started again from that
> savepoint, it will not result in any duplicates being sent to RabbitMQ. Is
> that correct?
>
> Thanks!
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint
>
> --
> Piotr Domagalski
>


-- 

Alexander Preuß | Engineer - Data Intensive Systems

alexanderpre...@ververica.com

<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH

Registered at Amtsgericht Charlottenburg: HRB 158244 B

Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason,
Jinwei (Kevin) Zhang


Re: Exception Handling in ElasticsearchSink

2022-04-29 Thread Alexander Preuß
Hi Rion,
Sorry for the late reply. There should be no problems instantiating the
metric in the open() function and passing down its reference through
createSink and buildSinkFromRoute. I'd be happy to help in case you
encounter any issues.

Best,
Alexander

On Thu, Apr 21, 2022 at 10:49 PM Rion Williams 
wrote:

> Hi all,
>
> I've recently been encountering some issues that I've noticed in the logs
> of my Flink job that handles writing to an Elasticsearch index. I was
> hoping to leverage some of the metrics that Flink exposes (or piggyback on
> them) to update metric counters when I encounter specific kinds of errors.
>
> val builder = ElasticsearchSink.Builder(...)
>
> builder.setFailureHandler { actionRequest, throwable, _, _ ->
> // Log error here (and update metrics via metricGroup.counter(...)
> }
>
> return builder.build()
>
> Is there a way to handle this currently? My specific implementation has a
> process function that manages multiple sinks (so I can create these
> dynamically), but in the case of these errors, it doesn't look like I can
> access the metric group within the setFailureHandler at present.
>
> My initial thought was in my parent process function, I could pass in the
> context to the child sinks so that I'd have context for the
> exceptions/metrics:
>
> class DynamicElasticsearchSink ElasticsearchSinkBase>(
> /**
>  * Defines a router that maps an element to its corresponding 
> ElasticsearchSink instance
>  * @param sinkRouter A [ElasticSinkRouter] that takes an element of type 
> [ElementT], a string-based route
>  * defined as [RouteT] which is used for caching sinks, and finally the 
> sink itself as [ElasticsearchSink]
>  */
> private val sinkRouter: ElasticsearchSinkRouter
> ) : RichSinkFunction(), CheckpointedFunction {
>
> // Store a reference to all of the current routes
> private val sinkRoutes: MutableMap = ConcurrentHashMap()
> private lateinit var configuration: Configuration
>
> override fun open(parameters: Configuration) {
> configuration = parameters
> }
>
> override fun invoke(value: ElementT, context: SinkFunction.Context) {
> val route = sinkRouter.getRoute(value)
> var sink = sinkRoutes[route]
> if (sink == null) {
> // Here's where the sink is constructed when an exception occurs
> sink = sinkRouter.createSink(route, value)
> sink.runtimeContext = runtimeContext
> sink.open(configuration)
> sinkRoutes[route] = sink
> }
>
> sink.invoke(value, context)
> }
> }
>
> I'd imagine within the open call for this function, I could store the
> metrics group and pass it into my createSink() call so the child sinks
> would have a reference to it. Does that seem feasible or is there another
> way to handle this?
>
> Thanks all,
>
> Rion
>
>

-- 

Alexander Preuß | Engineer - Data Intensive Systems

alexanderpre...@ververica.com

<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH

Registered at Amtsgericht Charlottenburg: HRB 158244 B

Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason,
Jinwei (Kevin) Zhang


Re: Reactive mode and checkpointing

2022-04-12 Thread Alexander Preuß
Hello,

There are no scheduler-specific options for checkpointing. You can however
set `execution.checkpointing.tolerable-failed-checkpoints` to 0 to forbid
checkpoint failures (
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-checkpointing-tolerable-failed-checkpoints
).

Best regards,
Alexander



On Tue, Apr 12, 2022 at 6:43 AM aryan m  wrote:

> Hello !
>
>   Are there options in reactive mode to prevent a job from restarting if
> the last checkpoint failed or timed out due to any reason ?
>
>
> Thanks,
> AR
>
>

-- 

Alexander Preuß | Engineer - Data Intensive Systems

alexanderpre...@ververica.com

<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH

Registered at Amtsgericht Charlottenburg: HRB 158244 B

Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason,
Jinwei (Kevin) Zhang


Re: FileSystem format

2022-03-23 Thread Alexander Preuß
Hi Ian,

Unfortunately configuring the naming is only possible when using the
FileSystem connector from DataStream. If this would be an option for you
the configuration is explained here:
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#part-file-configuration

Best,
Alexander

On Wed, Mar 23, 2022 at 4:06 AM lan tran  wrote:

> Hi team,
> So basically, when I use Flink Table API to generate the files and store
> in S3. The format files will be like this
> part-0d373ee1-d594-40b1-a3cf-8fa895260980-0-0. So my question is is there
> any way that we can config this files names (by adding the
> last_modified_value) to this files name ?
>
> Best,
> Quynh
>
>
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>


-- 

Alexander Preuß | Junior Engineer - Data Intensive Systems

alexanderpre...@ververica.com

<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH

Registered at Amtsgericht Charlottenburg: HRB 158244 B

Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason,
Jinwei (Kevin) Zhang


Re: Flink failure rate restart not work as expect

2022-02-28 Thread Alexander Preuß
Hi,
from a first glance it looks like the exception was thrown very rapidly so
it exceeded the maxFailuresPerInterval and the FailureRestartStrategy
decided not to restart. Why do you think this is different from the
expected behavior?

Best,
Alex

On Tue, Mar 1, 2022 at 3:23 AM 刘 家锹  wrote:

> Hi, all
> We encounter some problem with FailureRateRestartStrategy, which confuse
> us and don't know how to solove it. Here's the situation:
>
> Flink version: 1.10.1
> Development env: on Yarn
>
> FailureRateRestartStrategy: 
> failuresIntervalMS=6,backoffTimeMS=15000,maxFailuresPerInterval=4
>
> One of our hadoop machine got stuck without response, which our job's
> taskmanager running on. At this moment, the jobmanager receive a heartbeat
> timeout exception, but after throwing 4 times exception in a very short
> time(about 10ms each), it hit the FailureRateRestartStrategy and all job
> quit, we got the message of 'org.apache.flink.runtime.JobException:
> Recovery is suppressed by FailureRateRestartBackoffTimeStrategy'.
> As I know from document, the behavior expected was jobmanager should try
> to restart the job which will bring up a new taskmanager on other machine,
> but it did not.
> We also do some test, start a new job and just kill the taskamanger, but
> it can restart as expect.
>
> So it confuse us most,  if anyone know what happen, that would be thanks.
>
> JobManager log and TaskManager log append below
>


-- 

Alexander Preuß | Junior Engineer - Data Intensive Systems

alexanderpre...@ververica.com

<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH

Registered at Amtsgericht Charlottenburg: HRB 158244 B

Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason,
Jinwei (Kevin) Zhang


Re: [state.checkpoints.num-retained ]The default value does not take effect

2021-12-17 Thread Alexander Preuß
Hi Chenqizhu,

If you believe what you have found is a bug, please file an issue of type
bug in the ASFJira (https://issues.apache.org/jira/projects/FLINK/issues)
and select 1.13.3 as the affected version.


Best regards,
Alexander

On Fri, Dec 17, 2021 at 8:56 AM chenqizhu  wrote:

> hi,
>
>The configration is valid only when I specify -Dstate.cache.
> num-retained=n by command line interface.
>If I do not specify this configuration, the default value does not take
> effect , is it a bug ?
>
>my flink version : flink-1.13.3
>
>
>
>


-- 

Alexander Preuß | Junior Engineer - Data Intensive Systems

alexanderpre...@ververica.com

<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH

Registered at Amtsgericht Charlottenburg: HRB 158244 B

Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason,
Jinwei (Kevin) Zhang


Re: Information request: Reactive mode and Rescaling

2021-12-16 Thread Alexander Preuß
Hi Morgan,

Regarding your first question, if the Kafka connector is configured to use
exactly-once semantics it will check the offsets of partitions when
recovering from the checkpoint, so there will be no data loss or
duplication.
I'm not quite sure I understood the second part of the first question
regarding the HPA, are you asking if Flink is actively triggering a new
checkpoint before the pod shutdown? If so then the answer is no, it will
use the latest completed checkpoint.

For the second question, it will use the latest checkpoint, there is no
'active'/forced creation of a new checkpoint before the re-scaling.

You can find some more information about the mechanism here:
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/

Best regards,
Alexander

On Wed, Dec 15, 2021 at 10:24 PM Geldenhuys, Morgan Karl <
morgan.geldenh...@tu-berlin.de> wrote:

> Greetings,
>
>
> I would like to find out more about Flink's new reactive mode as well as
> the rescaling feature regarding fault tolerance. For the following question
> lets assume checkpointing is enabled using HDFS.
>
>
> So first question, if I have a job where the source(s) and sink(s) are
> configured to use Kafka with exactly-once processing enabled, how does
> reactive mode handle this? On reconfigurations using the Horizontal Pod
> Autoscaler, does it recover to the latest checkpoint?
>
>
> Second question, for re-scaling, does it automatically create a
> savepoint and then rescale or does it use the latest checkpoint to ensure
> results are consistent during reconfiguration.
>
>
> Thank you in advance!
>
>
> M.
>


-- 

Alexander Preuß | Junior Engineer - Data Intensive Systems

alexanderpre...@ververica.com

<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH

Registered at Amtsgericht Charlottenburg: HRB 158244 B

Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason,
Jinwei (Kevin) Zhang


Re: pyFlink + asyncio

2021-12-16 Thread Alexander Preuß
Hi Михаил,

>From looking at
https://nightlies.apache.org/flink/flink-docs-master/api/python//pyflink.datastream.html
there is currently no AsyncFunction / RichAsyncFunction implementation in
pyFlink, so you are bound to synchronously interacting.

Best regards,
Alexander

On Thu, Dec 16, 2021 at 12:47 PM Королькевич Михаил 
wrote:

> Hi team!
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/
> Is it possible to use this for pyFlink?
> Or another asynchronous enrichment of an unordered data stream?
>


-- 

Alexander Preuß | Junior Engineer - Data Intensive Systems

alexanderpre...@ververica.com

<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH

Registered at Amtsgericht Charlottenburg: HRB 158244 B

Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason,
Jinwei (Kevin) Zhang


Re: custom metrics in elasticsearch ActionRequestFailureHandler

2021-12-03 Thread Alexander Preuß
Hi Lars,

What is your use case for the failure handler, just collecting metrics? We
want to remove the configurable failure handler in the new Sink API
implementation of the Elasticsearch connector in Flink 1.15 because it can
be a huge footgun with regards to delivery guarantees.

Best Regards,
Alexander

On Thu, Dec 2, 2021 at 6:23 PM Lars Bachmann 
wrote:

> Hi David,
>
> Thanks for the reply. I think especially in an error/failure handler
> metrics are important in order to have proper monitoring/alerting in such
> cases. Would be awesome if this could be added to Flink at some point :).
>
> Regards,
>
> Lars
>
> Am 02.12.2021 um 18:13 schrieb David Morávek :
>
> Hi Lars,
>
> quickly looking at the ES connector code, I think you're right and there
> is no way to do that :(  In general I'd say that being able to expose
> metrics is a valid request.
>
> I can imagine having some kind of `RichActionRequestFailureHandler` with
> `{get|set}RuntimeContext` methods. More or less the same thing we already
> do with for example the `RichFunction`. This unfortunately requires some
> work on the Flink side.
>
> cc @Arvid
>
> On Thu, Dec 2, 2021 at 5:52 PM  wrote:
>
>> Hi,
>>
>> is there a way to expose custom metrics within an elasticsearch failure
>> handler (ActionRequestFailureHandler)? To register custom metrics I need
>> access to the runtime context but I don't see a way to access the
>> context in the failure handler.
>>
>> Thanks and regards,
>>
>> Lars
>>
>
>


Re: how to run streaming process after batch process is completed?

2021-11-30 Thread Alexander Preuß
Hi Vtygoss,

Can you explain a bit more about your ideal pipeline? Is the batch data
bounded data or could you also process it in streaming execution mode? And
is the streaming data derived from the batch data or do you just want to
ensure that the batch has been finished before running the processing of
the streaming data?

Best Regards,
Alexander

(sending again because I accidentally left out the user ml in the reply on
the first try)

On Tue, Nov 30, 2021 at 12:38 PM vtygoss  wrote:

> Hi, community!
>
>
> By Flink, I want to unify batch process and streaming process in data
> production pipeline. Batch process is used to process inventory data, then
> streaming process is used to process incremental data. But I meet a
> problem, there is no  state in batch and the result is error if i run
> stream process directly.
>
>
> So how to run streaming process accurately  after batch process is
> completed?   Is there any doc or demo to handle this scenario?
>
>
> Thanks for your any reply or suggestion!
>
>
> Best Regards!
>
>
>
>
>


Re: Could not retrieve submitted JobGraph from state handle

2021-11-16 Thread Alexander Preuß
Hi Alexey,

Are you maybe reusing the cluster-id?

Also, could you provide some more information on your setup and a more
complete stacktrace?
The ConfigMap contains pointers to the actual files on Azure.

Best,
Alexander

On Tue, Nov 16, 2021 at 6:14 AM Alexey Trenikhun  wrote:

> Hello,
> We are using Kubernetes HA and Azure Blob storage and in rare cases I see
> following error:
>
> Could not retrieve submitted JobGraph from state handle under
> jobGraph-. This indicates that the
> retrieved state handle is broken. Try cleaning the state handle store.
>
> Question is, how exactly can I clean "state handle store"? Delete
> fsp-dispatcher-leader Config Map? Or some files (which one) in Azure Blob
> storage?
>
> Thanks,
> Alexey
>