Re: Stopping a job

2020-06-06 Thread M Singh
 
Hi Arvid:   
Thanks for the links.  
A few questions:
1. Is there any particular interface in 1.9+ that identifies the source as 
stoppable ?2. Is there any distinction b/w stop and cancel  in 1.9+ ?3. Is 
there any list of sources which are documented as stoppable besides the one 
listed in your SO link ?4. In 1.9+ there is flink stop command and a flink 
cancel command. 
(https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#stop).  So 
it appears that flink stop will take a savepoint and the call cancel, and 
cancel will just cancel the job (looks like cancel with savepoint is deprecated 
in 1.10).  
Thanks again for your help.


On Saturday, June 6, 2020, 02:18:57 PM EDT, Arvid Heise 
 wrote:  
 
 Yes, it seems as if FlinkKinesisConsumer does not implement it.
Here are the links to the respective javadoc [1] and code [2]. Note that in 
later releases (1.9+) this interface has been removed. Stop is now implemented 
through a cancel() on source level.
In general, I don't think that in a Kinesis to Kinesis use case, stop is needed 
anyways, since there is no additional consistency expected over a normal cancel.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/api/common/functions/StoppableFunction.html[2]
 
https://github.com/apache/flink/blob/release-1.6/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java
On Sat, Jun 6, 2020 at 8:03 PM M Singh  wrote:

 Hi Arvid:
I check the link and it indicates that only Storm SpoutSource, TwitterSource 
and NifiSource support stop.   
Does this mean that FlinkKinesisConsumer is not stoppable ?
Also, can you please point me to the Stoppable interface mentioned in the link 
?  I found the following but am not sure if TwitterSource implements it 
:https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java
Thanks




On Friday, June 5, 2020, 02:48:49 PM EDT, Arvid Heise  
wrote:  
 
 Hi,
could you check if this SO thread [1] helps you already?
[1] 
https://stackoverflow.com/questions/53735318/flink-how-to-solve-error-this-job-is-not-stoppable
On Thu, Jun 4, 2020 at 7:43 PM M Singh  wrote:

Hi:
I am running a job which consumes data from Kinesis and send data to another 
Kinesis queue.  I am using an older version of Flink (1.6), and when I try to 
stop the job I get an exception 

 

Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.rest.util.RestClientException: [Job termination (STOP) 
failed: This job is not stoppable.]


I wanted to find out what is a stoppable job and it possible to make a job 
stoppable if is reading/writing to kinesis ?
Thanks





-- 

Arvid Heise | Senior Java Developer




Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) 
Cheng      


-- 

Arvid Heise | Senior Java Developer




Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) 
Cheng      

Flink not restoring from checkpoint when job manager fails even with HA

2020-06-06 Thread Kathula, Sandeep
Hi,
We are running Flink in K8S. We used 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/jobmanager_high_availability.html
to set high availability. We set max number of retries for a task to 2. 
After task fails twice and then the job manager fails. This is expected. But it 
is removing checkpoint from the zookeeper. As a result on the restart it is not 
consuming from the previous checkpoint. We are losing the data.

Logs:

2020/06/06 19:39:07.759 INFO  o.a.f.r.c.CheckpointCoordinator - Stopping 
checkpoint coordinator for job .
2020/06/06 19:39:07.759 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
Shutting down
2020/06/06 19:39:07.823 INFO  o.a.f.r.z.ZooKeeperStateHandleStore - Removing 
/flink/sessionization_test4/checkpoints/ from 
ZooKeeper
2020/06/06 19:39:07.823 INFO  o.a.f.r.c.CompletedCheckpoint - Checkpoint with 
ID 11 at 
's3://s3_bucket/sessionization_test/checkpoints//chk-11'
 not discarded.
2020/06/06 19:39:07.829 INFO  o.a.f.r.c.ZooKeeperCheckpointIDCounter - Shutting 
down.
2020/06/06 19:39:07.829 INFO  o.a.f.r.c.ZooKeeperCheckpointIDCounter - Removing 
/checkpoint-counter/ from ZooKeeper
2020/06/06 19:39:07.852 INFO  o.a.f.r.dispatcher.MiniDispatcher - Job 
 reached globally terminal state FAILED.
2020/06/06 19:39:07.852 INFO  o.a.f.runtime.jobmaster.JobMaster - Stopping the 
JobMaster for job 
sppstandardresourcemanager-flink-0606193838-6d7dae7e().
2020/06/06 19:39:07.854 INFO  o.a.f.r.entrypoint.ClusterEntrypoint - Shutting 
StandaloneJobClusterEntryPoint down with application status FAILED. Diagnostics 
null.
2020/06/06 19:39:07.854 INFO  o.a.f.r.j.MiniDispatcherRestEndpoint - Shutting 
down rest endpoint.
2020/06/06 19:39:07.856 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - 
Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
2020/06/06 19:39:07.859 INFO  o.a.f.r.j.slotpool.SlotPoolImpl - Suspending 
SlotPool.
2020/06/06 19:39:07.859 INFO  o.a.f.runtime.jobmaster.JobMaster - Close 
ResourceManager connection d28e9b9e1fc1ba78c2ed010070518057: JobManager is 
shutting down..
2020/06/06 19:39:07.859 INFO  o.a.f.r.j.slotpool.SlotPoolImpl - Stopping 
SlotPool.
2020/06/06 19:39:07.859 INFO  o.a.f.r.r.StandaloneResourceManager - Disconnect 
job manager 
afae482ff82bdb26fe275174c14d4...@akka.tcp://flink@flink-job-cluster:6123/user/jobmanager_0
 for job  from the resource manager.
2020/06/06 19:39:07.860 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - 
Stopping ZooKeeperLeaderElectionService 
ZooKeeperLeaderElectionService{leaderPath='/leader//job_manager_lock'}.
2020/06/06 19:39:07.868 INFO  o.a.f.r.j.MiniDispatcherRestEndpoint - Removing 
cache directory /tmp/flink-web-ef940924-348b-461c-ab53-255a914ed43a/flink-web-ui
2020/06/06 19:39:07.870 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - 
Stopping ZooKeeperLeaderElectionService 
ZooKeeperLeaderElectionService{leaderPath='/leader/rest_server_lock'}.
2020/06/06 19:39:07.870 INFO  o.a.f.r.j.MiniDispatcherRestEndpoint - Shut down 
complete.
2020/06/06 19:39:07.870 INFO  o.a.f.r.r.StandaloneResourceManager - Shut down 
cluster because application is in FAILED, diagnostics null.
2020/06/06 19:39:07.870 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - 
Stopping ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2020/06/06 19:39:07.871 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - 
Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
2020/06/06 19:39:07.871 INFO  o.a.f.r.dispatcher.MiniDispatcher - Stopping 
dispatcher akka.tcp://flink@flink-job-cluster:6123/user/dispatcher.
2020/06/06 19:39:07.871 INFO  o.a.f.r.dispatcher.MiniDispatcher - Stopping all 
currently running jobs of dispatcher 
akka.tcp://flink@flink-job-cluster:6123/user/dispatcher.
2020/06/06 19:39:07.871 INFO  o.a.f.r.r.s.SlotManagerImpl - Closing the 
SlotManager.
2020/06/06 19:39:07.871 INFO  o.a.f.r.r.s.SlotManagerImpl - Suspending the 
SlotManager.
2020/06/06 19:39:07.871 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - 
Stopping ZooKeeperLeaderElectionService 
ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}.
2020/06/06 19:39:07.871 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService - 
Stopping ZooKeeperLeaderRetrievalService 
/leader//job_manager_lock.
2020/06/06 19:39:07.974 INFO  o.a.f.r.r.h.l.b.StackTraceSampleCoordinator - 
Shutting down stack trace sample coordinator.
2020/06/06 19:39:07.975 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService - 
Stopping ZooKeeperLeaderElectionService 
ZooKeeperLeaderElectionService{leaderPath='/leader/dispatcher_lock'}.
2020/06/06 19:39:07.975 INFO  o.a.f.r.dispatcher.MiniDispatcher - Stopped 
dispatcher 

Failed to deserialize Avro record

2020-06-06 Thread Ramana Uppala
We are using AvroRowDeserializationSchema with Kafka Table source to 
deserialize the messages. Application failed with "Failed to deserialize Avro 
record." for different messages it seems.

Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is 
negative: -26

Caused by: java.lang.ArrayIndexOutOfBoundsException: -49
at 
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424) 
~[avro-1.8.2.jar:1.8.2]

We are not sure what the serialization mechanism producer is using to publish 
the messages at this time. But above errors are related to 
https://issues.apache.org/jira/browse/FLINK-16048 ?

Any suggestions on fixing above issues ? we are using Flink 1.10


Re: Stopping a job

2020-06-06 Thread Arvid Heise
Yes, it seems as if FlinkKinesisConsumer does not implement it.

Here are the links to the respective javadoc [1] and code [2]. Note that in
later releases (1.9+) this interface has been removed. Stop is now
implemented through a cancel() on source level.

In general, I don't think that in a Kinesis to Kinesis use case, stop is
needed anyways, since there is no additional consistency expected over a
normal cancel.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/api/common/functions/StoppableFunction.html
[2]
https://github.com/apache/flink/blob/release-1.6/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java

On Sat, Jun 6, 2020 at 8:03 PM M Singh  wrote:

> Hi Arvid:
>
> I check the link and it indicates that only Storm SpoutSource,
> TwitterSource and NifiSource support stop.
>
> Does this mean that FlinkKinesisConsumer is not stoppable ?
>
> Also, can you please point me to the Stoppable interface mentioned in the
> link ?  I found the following but am not sure if TwitterSource implements
> it :
>
> https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java
>
> Thanks
>
>
>
>
>
> On Friday, June 5, 2020, 02:48:49 PM EDT, Arvid Heise 
> wrote:
>
>
> Hi,
>
> could you check if this SO thread [1] helps you already?
>
> [1]
> https://stackoverflow.com/questions/53735318/flink-how-to-solve-error-this-job-is-not-stoppable
>
> On Thu, Jun 4, 2020 at 7:43 PM M Singh  wrote:
>
> Hi:
>
> I am running a job which consumes data from Kinesis and send data to
> another Kinesis queue.  I am using an older version of Flink (1.6), and
> when I try to stop the job I get an exception
>
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException: [Job termination
> (STOP) failed: This job is not stoppable.]
>
>
> I wanted to find out what is a stoppable job and it possible to make a job
> stoppable if is reading/writing to kinesis ?
>
> Thanks
>
>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Stopping a job

2020-06-06 Thread M Singh
 Hi Arvid:
I check the link and it indicates that only Storm SpoutSource, TwitterSource 
and NifiSource support stop.   
Does this mean that FlinkKinesisConsumer is not stoppable ?
Also, can you please point me to the Stoppable interface mentioned in the link 
?  I found the following but am not sure if TwitterSource implements it 
:https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java
Thanks




On Friday, June 5, 2020, 02:48:49 PM EDT, Arvid Heise  
wrote:  
 
 Hi,
could you check if this SO thread [1] helps you already?
[1] 
https://stackoverflow.com/questions/53735318/flink-how-to-solve-error-this-job-is-not-stoppable
On Thu, Jun 4, 2020 at 7:43 PM M Singh  wrote:

Hi:
I am running a job which consumes data from Kinesis and send data to another 
Kinesis queue.  I am using an older version of Flink (1.6), and when I try to 
stop the job I get an exception 

 

Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.rest.util.RestClientException: [Job termination (STOP) 
failed: This job is not stoppable.]


I wanted to find out what is a stoppable job and it possible to make a job 
stoppable if is reading/writing to kinesis ?
Thanks





-- 

Arvid Heise | Senior Java Developer




Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) 
Cheng      

??????flink 1.9 ????????????????

2020-06-06 Thread Sun.Zhu
Hi,star
KafkaConnectorupsert[1]


[1]https://mp.weixin.qq.com/s/MSs7HSaegyWWU3Fig2PYYA
| |
Sun.Zhu
|
|
17626017...@163.com
|
??


??2020??06??3?? 14:47??star<3149768...@qq.com> ??



??toRetractStreamkafka??
??kafka??flink 
??RetractStream






Re: Flink s3 streaming performance

2020-06-06 Thread venkata sateesh` kolluru
Thanks Arvid!

Will try to increase the property you recommended and will post the update.

On Sat, Jun 6, 2020, 7:33 AM Arvid Heise  wrote:

> Hi Venkata,
>
> you can find them on the Hadoop AWS page (we are just using it as a
> library) [1].
>
> [1]
> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html#General_S3A_Client_configuration
>
> On Sat, Jun 6, 2020 at 1:26 AM venkata sateesh` kolluru <
> vkollur...@gmail.com> wrote:
>
>> Hi Kostas and Arvid,
>>
>> Thanks for your suggestions.
>>
>> The small files were already created and I am trying to roll few into a
>> big file while sinking. But due to the custom bucket assigner, it is hard
>> getting more files with in the same prefix in specified checkinpointing
>> time.
>>
>> For example:
>> /prefix1/prefix2/YY/MM/DD/HH  is our structure in s3.
>> checkpointing interval is 5 minutes. prefix1 has 40 different values and
>> prefix 2 has 1+ values
>> With in the 5 minute interval, we are able to get part file size in these
>> prefixes not more than 5-10 files.
>>
>> Regarding printstream, will figure out how to use SimpleStringEncoder on
>> a Tuple as I only need to write tuple.f2 element in the file. If you can
>> guide me on how to do it, I would appreciate it.
>>
>> Will try Arvid suggestion on increasing fs.s3a.connection.maximum . I
>> was trying to find about these parameters and could find anywhere. Is there
>> a place that I could look at these config params list ?
>>
>> Also I am using s3:// as prefix, would fs.s3a.connection.maximum affect
>> that too or is there separate param like fs.s3.connection.maximum.
>>
>> On Fri, Jun 5, 2020 at 2:13 PM Kostas Kloudas  wrote:
>>
>>> Hi all,
>>>
>>> @Venkata, Do you have many small files being created as Arvid suggested?
>>> If yes, then I tend to agree that S3 is probably not the best sink.
>>> Although I did not get that from your description.
>>> In addition, instead of PrintStream you can have a look at the code of
>>> the SimpleStringEncoder in Flink [1] for a bit more efficient
>>> implementation.
>>>
>>> Cheers,
>>> Kostas
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringEncoder.java
>>>
>>>
>>> On Fri, Jun 5, 2020 at 7:56 PM Arvid Heise  wrote:
>>>
 Hi Venkata,

 are the many small files intended or is it rather an issue of our
 commit on checkpointing? If so then FLINK-11499 [1] should help you. Design
 is close to done, unfortunately implementation will not make it into 1.11.

 In any case, I'd look at the parameter fs.s3a.connection.maximum, as
 you store both state and data on S3. I'd probably go with slot*3 or even
 higher.

 Lastly, the way you output elements looks also a bit suspicious.
 PrintStream is not known for great performance. I'm also surprised that it
 works without manual flushing.

 [1] https://issues.apache.org/jira/browse/FLINK-11499

 On Mon, Jun 1, 2020 at 9:52 AM Jörn Franke 
 wrote:

> I think S3 is a wrong storage backend for this volumes of small
> messages.
> Try to use a NoSQL database or write multiple messages into one file
> in S3 (1 or 10)
>
> If you still want to go with your scenario then try a network
> optimized instance and use s3a in Flink and configure s3 entropy.
>
> Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru <
> vkollur...@gmail.com>:
>
> 
> Hi David,
>
> The avg size of each file is around 30KB and I have checkpoint
> interval of 5 minutes. Some files are even 1 kb, because of checkpoint 
> some
> files are merged into 1 big file around 300MB.
>
> With 120 million files and 4Tb, if the rate of transfer is 300 per
> minute, it is taking weeks to write to s3.
>
> I have tried to increase parallelism of sink but I dont see any
> improvement.
>
> The sink record is Tuple3, the actual content of
> file is f2. This is content is written to /f0/f1/part*-*
>
> I guess the prefix determination in custombucketassigner wont be
> causing this delay?
>
> Could you please shed some light on writing custom s3 sink ?
>
> Thanks
>
>
> On Sun, May 31, 2020, 6:34 AM David Magalhães 
> wrote:
>
>> Hi Venkata.
>>
>> 300 requests per minute look like a 200ms per request, which should
>> be a normal response time to send a file if there isn't any speed
>> limitation (how big are the files?).
>>
>> Have you changed the parallelization to be higher than 1? I also
>> recommend to limit the source parallelization, because it can consume
>> pretty fast from Kafka and create some kind of backpressure.
>>
>> I don't any much experience with StreamingFileSink, because I've
>> ended up using a custom S3Sink, but I did have some issues writing to S3
>> because the 

Re: Run command after Batch is finished

2020-06-06 Thread Jeff Zhang
It would run in the client side where ExecutionEnvironment is created.

Mark Davis  于2020年6月6日周六 下午8:14写道:

> Hi Jeff,
>
> Thank you very much! That is exactly what I need.
>
> Where the listener code will run in the cluster deployment(YARN, k8s)?
> Will it be sent over the network?
>
> Thank you!
>
>   Mark
>
> ‐‐‐ Original Message ‐‐‐
> On Friday, June 5, 2020 6:13 PM, Jeff Zhang  wrote:
>
> You can try JobListener which you can register to ExecutionEnvironment.
>
>
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java
>
>
> Mark Davis  于2020年6月6日周六 上午12:00写道:
>
>> Hi there,
>>
>> I am running a Batch job with several outputs.
>> Is there a way to run some code(e.g. release a distributed lock) after
>> all outputs are finished?
>>
>> Currently I do this in a try-finally block around
>> ExecutionEnvironment.execute() call, but I have to switch to the detached
>> execution mode - in this mode the finally block is never run.
>>
>> Thank you!
>>
>>   Mark
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>

-- 
Best Regards

Jeff Zhang


flink1.9集成pushgateway和prometheus版本问题

2020-06-06 Thread guanyq
请教下大佬们,想知道flink1.9.0版本对应pushgateway和prometheus的版本号分别都是多少。

Re: Run command after Batch is finished

2020-06-06 Thread Mark Davis
Hi Jeff,

Thank you very much! That is exactly what I need.

Where the listener code will run in the cluster deployment(YARN, k8s)?
Will it be sent over the network?

Thank you!

  Mark

‐‐‐ Original Message ‐‐‐
On Friday, June 5, 2020 6:13 PM, Jeff Zhang  wrote:

> You can try JobListener which you can register to ExecutionEnvironment.
>
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java
>
> Mark Davis  于2020年6月6日周六 上午12:00写道:
>
>> Hi there,
>>
>> I am running a Batch job with several outputs.
>> Is there a way to run some code(e.g. release a distributed lock) after all 
>> outputs are finished?
>>
>> Currently I do this in a try-finally block around 
>> ExecutionEnvironment.execute() call, but I have to switch to the detached 
>> execution mode - in this mode the finally block is never run.
>>
>> Thank you!
>>
>>   Mark
>
> --
> Best Regards
>
> Jeff Zhang

Re: Flink s3 streaming performance

2020-06-06 Thread Arvid Heise
Hi Venkata,

you can find them on the Hadoop AWS page (we are just using it as a
library) [1].

[1]
https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html#General_S3A_Client_configuration

On Sat, Jun 6, 2020 at 1:26 AM venkata sateesh` kolluru <
vkollur...@gmail.com> wrote:

> Hi Kostas and Arvid,
>
> Thanks for your suggestions.
>
> The small files were already created and I am trying to roll few into a
> big file while sinking. But due to the custom bucket assigner, it is hard
> getting more files with in the same prefix in specified checkinpointing
> time.
>
> For example:
> /prefix1/prefix2/YY/MM/DD/HH  is our structure in s3.
> checkpointing interval is 5 minutes. prefix1 has 40 different values and
> prefix 2 has 1+ values
> With in the 5 minute interval, we are able to get part file size in these
> prefixes not more than 5-10 files.
>
> Regarding printstream, will figure out how to use SimpleStringEncoder on a
> Tuple as I only need to write tuple.f2 element in the file. If you can
> guide me on how to do it, I would appreciate it.
>
> Will try Arvid suggestion on increasing fs.s3a.connection.maximum . I was
> trying to find about these parameters and could find anywhere. Is there a
> place that I could look at these config params list ?
>
> Also I am using s3:// as prefix, would fs.s3a.connection.maximum affect
> that too or is there separate param like fs.s3.connection.maximum.
>
> On Fri, Jun 5, 2020 at 2:13 PM Kostas Kloudas  wrote:
>
>> Hi all,
>>
>> @Venkata, Do you have many small files being created as Arvid suggested?
>> If yes, then I tend to agree that S3 is probably not the best sink.
>> Although I did not get that from your description.
>> In addition, instead of PrintStream you can have a look at the code of
>> the SimpleStringEncoder in Flink [1] for a bit more efficient
>> implementation.
>>
>> Cheers,
>> Kostas
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringEncoder.java
>>
>>
>> On Fri, Jun 5, 2020 at 7:56 PM Arvid Heise  wrote:
>>
>>> Hi Venkata,
>>>
>>> are the many small files intended or is it rather an issue of our commit
>>> on checkpointing? If so then FLINK-11499 [1] should help you. Design is
>>> close to done, unfortunately implementation will not make it into 1.11.
>>>
>>> In any case, I'd look at the parameter fs.s3a.connection.maximum, as
>>> you store both state and data on S3. I'd probably go with slot*3 or even
>>> higher.
>>>
>>> Lastly, the way you output elements looks also a bit suspicious.
>>> PrintStream is not known for great performance. I'm also surprised that it
>>> works without manual flushing.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-11499
>>>
>>> On Mon, Jun 1, 2020 at 9:52 AM Jörn Franke  wrote:
>>>
 I think S3 is a wrong storage backend for this volumes of small
 messages.
 Try to use a NoSQL database or write multiple messages into one file in
 S3 (1 or 10)

 If you still want to go with your scenario then try a network optimized
 instance and use s3a in Flink and configure s3 entropy.

 Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru <
 vkollur...@gmail.com>:

 
 Hi David,

 The avg size of each file is around 30KB and I have checkpoint interval
 of 5 minutes. Some files are even 1 kb, because of checkpoint some files
 are merged into 1 big file around 300MB.

 With 120 million files and 4Tb, if the rate of transfer is 300 per
 minute, it is taking weeks to write to s3.

 I have tried to increase parallelism of sink but I dont see any
 improvement.

 The sink record is Tuple3, the actual content of
 file is f2. This is content is written to /f0/f1/part*-*

 I guess the prefix determination in custombucketassigner wont be
 causing this delay?

 Could you please shed some light on writing custom s3 sink ?

 Thanks


 On Sun, May 31, 2020, 6:34 AM David Magalhães 
 wrote:

> Hi Venkata.
>
> 300 requests per minute look like a 200ms per request, which should be
> a normal response time to send a file if there isn't any speed limitation
> (how big are the files?).
>
> Have you changed the parallelization to be higher than 1? I also
> recommend to limit the source parallelization, because it can consume
> pretty fast from Kafka and create some kind of backpressure.
>
> I don't any much experience with StreamingFileSink, because I've ended
> up using a custom S3Sink, but I did have some issues writing to S3 because
> the request wasn't parallelised. Check this thread,
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070
>
> On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <
> vkollur...@gmail.com> wrote:
>
>> 

Flink Stream job to parquet sink

2020-06-06 Thread aj
Hello All,

I am receiving a set of events in Avro format on different topics. I want
to consume these and write to s3 in parquet format.
I have written a below job that creates a different stream for each event
and fetches it schema from the confluent schema registry to create a
parquet sink for an event.
This is working fine but the only problem I am facing is whenever a new
event start coming I have to change in the YAML config and restart the job
every time. Is there any way I do not have to restart the job and it start
consuming a new set of events.


YAML config :

!com.bounce.config.EventTopologyConfig
eventsType:
  - !com.bounce.config.EventConfig
event_name: "search_list_keyless"
schema_subject:
"search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
topic: "search_list_keyless"

  - !com.bounce.config.EventConfig
event_name: "bike_search_details"
schema_subject:
"bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
topic: "bike_search_details"

  - !com.bounce.config.EventConfig
event_name: "keyless_bike_lock"
schema_subject:
"analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
topic: "analytics-keyless"

  - !com.bounce.config.EventConfig
  event_name: "keyless_bike_unlock"
  schema_subject:
"analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
  topic: "analytics-keyless"


checkPointInterval: 120

topics: ["search_list_keyless","bike_search_details","analytics-keyless"]





*Sink code :*

  YamlReader reader = new YamlReader(topologyConfig);
EventTopologyConfig eventTopologyConfig =
reader.read(EventTopologyConfig.class);

long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
topics = eventTopologyConfig.getTopics();

List eventTypesList =
eventTopologyConfig.getEventsType();

CachedSchemaRegistryClient registryClient = new
CachedSchemaRegistryClient(schemaRegistryUrl, 1000);


FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);

DataStream dataStream =
streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");

try {
for (EventConfig eventConfig : eventTypesList) {

LOG.info("creating a stream for ", eventConfig.getEvent_name());

final StreamingFileSink sink = StreamingFileSink.forBulkFormat
(path, 
ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(),
registryClient)))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();

DataStream outStream =
dataStream.filter((FilterFunction) genericRecord -> {
if (genericRecord != null &&
genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
{
return true;
}
return false;
});

outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);

}
} catch (Exception e) {
e.printStackTrace();
}




-- 
Thanks & Regards,
Anuj Jain






Data Quality Library in Flink

2020-06-06 Thread aj
Hello All,

I  want to do some data quality analysis on stream data example.

1. Fill rate in a particular column
2. How many events are going to error queue due to favor schema
validation failed?
3. Different statistics measure of a column.
3. Alert if a particular threshold is breached (like if fill rate is less
than 90% for a column)

Is there any library that exists on top of Flink for data quality. As I am
looking there is a library on top of the spark
https://github.com/awslabs/deequ

This proved all that I am looking for.

-- 
Thanks & Regards,
Anuj Jain