Hi Erik,
I am still not able to understand reason behind this exception.
Is this exception causing failure and restart of job ? or This is occurring
after failure/restart is triggered .
Thanks
Sohi
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi Andrey,
I am using AvroSinkWriter (with Bucketing Sink) with compression enabled .
Looks like StreamingFileSink does not have direct support for
AvroSinkWriter. Sequence File Format is there for StreamingFileSink , but
looks like it roll files on every checkpoint (OnCheckpointRollingPolicy)
Thanks Andrey .
Yeah will upgrade and see if same gets reproduced .
-Sohi
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi Erik,
Are your suggesting all options together ?
Which of version of flink has this solved ? I am currently using 1.5.5 .
-Thanks
Sohi
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Any help ?
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Thanks Zhijiang .
Sorry to ask again . So both set of heartbeats are implementing same feature
.
If Yes , which one has highest priority to detect failure .
If no , can you explain little more or point to some references to
understand difference .
Thanks
Sohi
--
Sent from:
Yes Konstantin Knauf-2 . You are right .
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi,
Which version of flink you r using ?
Reset offset to earliest :
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
Thanks
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi,
In
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html
link there are two heartbeat config are mentioned .
akka.watch.heartbeat.interval
akka.watch.heartbeat.pause
Vs
heartbeat.interval
heartbeat.timeout
Can u guys pls explain what exactly is difference between
Hi David,
We are also running streaming jobs over Kafka source .
Yes : Consumer Group Id needs to be set for Kafka source explicitly t .
We are also using checkpointing and save points for persisting state . Any
time we change group id it starts from latest offset(default Kafka connector
Hi,
I am using Flink 1.5.5 . I have streaming job with 25 * 6 (150) parallelism
. I am facing too frequent heartbeat timeout . Even during off peak hours to
rule out memory issues .
Also I enabled debug logs for flink and observed Heartbeat request is
getting triggered every 5 seconds.
*
Hi Team,
Any help/update on this ?
This is still an issue where i am using bucketing sink in production.
Thanks
Sohi
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi,
Let's say I have flink Kafka consumer read from 3 topics , [ T-1 ,T-2,T-3 ]
.
- T1 and T2 are having partitions equal to 100
- T3 is having partitions equal to 60
- Flink Task (parallelism is 50)
How flink will prioritize Kafka topic ?
If T-3 has more lag than other topics will flink
Hi ,
Yes issue with Bucketing Sink . I removed and replaced Sink with Kafka Sink
it worked fine .
What could be causing
TimerException{java.nio.channels.ClosedByInterruptException}
at
Hi Andrey,
Yes .Setting setFailOnCheckpointingErrors(false) solved the problem.
But in between I am getting this error :
2019-01-16 21:07:26,979 ERROR
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler
- Implementation error: Unhandled exception.
Hi Andrey,
Yes. CustomBucketingSink is custom class copied from Bucketing Sink itself .
Few changes were added :
1. Add timestamp in part files
2. Few Logging statements
Note: Looks like I copied it from version 1.4 ( Don't know if that could be
the reason for failure)
Did it override
Hi Andrey ,
Pls find logs . Attaching dropbox link as logs as large .
Job Manager . : https://www.dropbox.com/s/q0rd60coydupl6w/full.log.gz?dl=0
Application :
https://www.dropbox.com/s/cn3yrd273wd99f2/jm-sohan.log.gz?dl=0
Thanks
Sohi
--
Sent from:
Yes. File got deleted .
2019-01-15 10:40:41,360 INFO FSNamesystem.audit: allowed=true ugi=hdfs
(auth:SIMPLE) ip=/192.168.3.184 cmd=delete
src=/pipeline/job/checkpoints/e9a08c0661a6c31b5af540cf352e1265/chk-470/5fb3a899-8c0f-45f6-a847-42cbb71e6d19
dst=nullperm=null
Hi ,
Flink - 1.5.5
My Streaming job has checkpoint every minute . I am getting following
exception.
2019-01-15 01:59:04,680 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 469 for job e9a08c0661a6c31b5af540cf352e1265 (2736 bytes in 124
ms).
2019-01-15
Hi ,
Any Update/help please ?
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi Team,
I am facing some issue with Custom Partitioner in flink Streaming . I am
using watcher to read file from folder and then I have to partition records
and send to sink .
- This is happening if parallelism > 1 .
- Checkpoint is enabled .
- If I don't use partitioner , then everything
Hi Stefan,
Attaching Logs :
You can search for : "2019-01-09 19:34:44,170 INFO
org.apache.flink.runtime.taskmanager.Task - Attempting
to cancel task Source:
" in first 2 log files.
f3-part-aa.gz
Hi,
I am running Flink Streaming Job with 1.5.5 version.
- Job is basically reading from Kafka , windowing on 2 minutes , and writing
to hdfs using AvroBucketing Sink .
- Job is running with parallelism 132
- Checkpointing is enabled with interval of 1 minute.
- Savepoint is enabled and getting
can anyone pls help ??
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Anyone can help ??
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi ,
While running Flink streaming job it is requesting more than specified
resources from yarn. I am giving 17 TM but it is requesting more than > 35
containers from yarn .
This is happening for all versions greater than 1.4.0.
Attaching JM logs.
logs.zip
Hi,
Flink is requesting more than specified containers from yarn . I am using 17
TM and 3 Slots but in starting it is acquiring > 35 TM and then releasing
them after sometime .
I have attached JM debug logs . Not sure what could be the issue ?
logs.zip
Hi ,
I have installed flink-1.7.0 Hadoop 2.7 scala 2.11 . We are using
hortonworks hadoop distribution.(hdp/2.6.1.0-129/)
*Flink lib folder looks like :*
-rw-r--r-- 1 hdfs hadoop 93184216 Nov 29 02:15 flink-dist_2.11-1.7.0.jar
-rw-r--r-- 1 hdfs hadoop79219 Nov 29 03:33
Thanks . It solved problem.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Let's assume I have yarn cluster with 3 nodes, 3 vcores each nodes. So total
available cores = 9
Now if I spin a flink job with taskmanager = 3 and no. of slots per task
manager = 2 ,what will happen :
1. 3 Jvms will be initiated (for each task manager)
2. Each JVM will run 2 threads for tasks
Hi Hequn,
I tried with following :
Configuration conf = new Configuration();
conf.setString("state.checkpoints.dir","file:///home/sohanvir/Desktop/flink/checkpoints2");
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1,conf);
Hi,
I am using following in code :
1. flink 1.4
2. running example on IDE
3. Enabled Exactly once semantics
4. Window Aggregation
5. Checkpoint is enabled at 20 Sec
6/ RocksDB as state backend
Workflow :
Kafka Source -> map -> keyBy -> Window(60 Sec) -> ApplyFunction ->
Aggregated Record to
Hi Amit ,
Thanks for response . Meanwhile I figured out the issue .
I had /Class X extending RichMapFunction/ and this class was preparing some
heavy data required for map function . I just moved that code to *open()*
function and it worked fine .
So I have one doubt , was it because flink was
Hi,
I am running flink batch job .
My job is running fine if i use 4 task manger and 8 slots = 32 parallelism
with 6GB memory per task manager.
As soon I increase task mangers to 5 with 6 task per task manager = 30
parallelism (6GB memory per task manager)
I am getting oom error . I am not
Hi ..
I have file in hdfs in format file.snappy.parquet . Can someone please
point/help with code example of reading parquet files .
-Sohi
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Thanks Stefan .
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi,
I have directory in HDFS containing 20 files with 150 Million records .
I just want random 20 million records from that directory . (Sampled Data ).
I see that there are few implementations are there in flink
Let's assume I have following class :
public class TestFlatMap extends RichFlatMapFunction {
private Connection connection ;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//
Hi Stefan ,
Here is main class code :
final String outFile = getOutFileName(backupDir);
final Set keys = getAllRedisKeys(parameters);
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env
.fromCollection(keys)
Hi,
I am getting Java Heap Space error while running Flink Job (Flink 1.2 ) .
Use case : I am getting all keys from REDIS with specific pattern . Then
streaming over those keys and reading data from Redis for those key and
writing to file in HDFS .
Job was running fine for few days but
Can you point to code to deserialize this data ? That would be great .
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
I tried to use file system as Backend store .
env.setStateBackend(new
FsStateBackend("file:///home/user/Desktop/data/flink/checkpoints"));
After running job on local , I see a folder with name
*a2716e2992bfb6f8796347328ec23c82* under directory
/home/user/Desktop/data/flink/checkpoints .
What
Is there way to read checkpoint ( if not configured to save to hdfs ) data
through rest api (or some other way) for monitoring purpose ?
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
My Bad :-) Sorry.
We are using flink 1.2 dependencies . And I think this functionality is
only available from flink 1.3 API Version .
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi,
I see that Flink Kafka consumer have ability to set specific offset to read
from
Map specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("topic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("topic", 1), 31L);
Thanks for Reply Robert .
How do I specify start position of consumer for FlinkKafkaConsumer010?
Because methods e.g. setStartFromSpecificOffsets sepecified in
documentation
Hi,
I am trying to replay kafka logs from specific offset . But I am not able to
make it work .
Using Ref :
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
My Code :
import
Few last doubts :
1. So If I increase parallelism latency will decrease because load will get
distributed ?
2. But if load will increase latency will also increase if parallelism is
more ?
3. Let's say If I remove partitioner , and Hbase Op is still there in Flat
map . Then also this latency
So , it means when elements leave
map => sit in buffer (due to partitioner) => enter flatmap
Since Hbase op in flat map are taking time lets say 1 sec per operation ,
next element will not be read from buffer until HBase Op is done.
Due to this Hbase op , time to enter to flat map from map
I had same concern regarding HBase . So I also added metric to measure Hbase
op time in flatmap (Basically complete flatmap op).
>From metrics I see that aprox 96 % time op time was under 1 sec. (Still I
can do a dummy run without HBase op . But did these timing make sense?)
--
View this
Source is KafKa .
FlatMap has HBase Lookup
Sink is Kafka .
I tried to get stats over the days . I see that almost 40 % were having
latency of 0 seconds , 10 % 0-30 sec, approx 10% 30-60 sec and 10 % around
60 - 120 sec and 30 % around 120 - 210 secs .
--
View this message in context:
So In following execution flow :
source -> map -> partitioner -> flatmap -> sink
I am attaching current time to tuple while emitting from map function , and
then extracting that timestamp value from tuple in flatmap at a very first
step . Then I am calculating difference between time attached
Hi Chesnay,
I have data categorized on some attribute(Key in partition ) which will be
having n possible values. As of now job is enabled for only one value of
that attribute . In couple of days we will enable all values of attribute
with more parallelism so each attribute's type data get
I have a execution flow (Streaming Job) with parallelism 1.
source -> map -> partitioner -> flatmap -> sink
Since adding partitioner will start new thread but partitioner is spending
average of 2 to 4 minutes while moving data from map to flat map .
For more details about this :
Cool. Thanks Closing thread .
--
View this message in context:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Gauge-Metric-is-not-getting-updated-on-Job-Dashboard-tp13842p13895.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at
Basically Every time I am calling add metric method it is just registering
the gauge .
I can register this gauge in open method and then in flatmap update the
value of gauge .
Right ?
--
View this message in context:
Here it is :
import com.codahale.metrics.SlidingWindowReservoir;
import in.dailyhunt.cis.enrichments.datatype.BasicInfoTuple;
import in.dailyhunt.cis.enrichments.datatype.SinkTuple;
import org.apache.flink.api.common.accumulators.LongCounter;
import
I ran job and monitored for approx 20 mins .
I tried with meter,accumulators,histogram,gauge .
Out of those only meter and accumulators were updating values, other were
only only showing constant value all the time .
--
View this message in context:
I was testing flink Gauge metric in my flat map function . I was just sending
a value (end time -start time ) of a DB Op in Flatmap . But on dashboard is
only showing first value instead of updated value .
I am using following code in my flat map .
getRuntimeContext()
.getMetricGroup()
Thanks for pointers Aljoscha.
I was just wondering, Since Custom partition will run in separate thread .
Is it possible that from map -> custom partition -> flat map can take more
than 200 seconds if parallelism is still 1 .
--
View this message in context:
You are right Aljoscha . Jog graph is splitted after introducing partitioner
.
I was under impression that If parallelism is set everything will be chained
together .
Can you explain how data will flow for map -> partitioner -> flatmap if
parallelism or It would be great if point me to right
Hi,
I have streaming job which is running with parallelism 1 as of now . (This
job will run with parallelism > 1 in future )
So I have added custom partitioner to partition the data based on one tuple
field .
The flow is :
source -> map -> partitioner -> flatmap -> sink
The partitioner is
Thanks Aljoscha for reply .
Is there any way where I can add custom metrics or counter in
FlinkKafkaProducer010 ?
--
View this message in context:
I am running a flink streaming job with parallelism 1 .
Suddenly after 4 hours job failed . It showed
Container container_e39_1492083788459_0676_01_02 is completed with
diagnostics: Container
[pid=79546,containerID=container_e39_1492083788459_0676_01_02] is
running beyond physical
64 matches
Mail list logo