Re: OOM error for heap state backend.

2020-08-26 Thread Vishwas Siravara
tate requirements. > All tasks in the same task manager share the JVM heap as the task manager > runs one JVM system process on the machine where it is deployed to. > > Best, > Andrey > > On Wed, Aug 26, 2020 at 6:52 PM Vishwas Siravara > wrote: > >> Hi Andrey, >&

OOM error for heap state backend.

2020-08-21 Thread Vishwas Siravara
Hi guys, I use flink version 1.7.2 I have a stateful streaming job which uses a keyed process function. I use heap state backend. Although I set TM heap size to 16 GB, I get OOM error when the state size is around 2.5 GB(from dashboard I get the state size). I have set taskmanager.memory.fraction:

Re: No space left on device exception

2020-08-20 Thread Vishwas Siravara
ing this to your flink-conf.yaml? > > s3.staging-directory: /usr/mware/flink/tmp > > On 20/08/2020 20:50, Vishwas Siravara wrote: > > Hi Piotr, > I did some analysis and realised that the temp files for s3 > checkpoints are staged in /tmp although the *io.tmp.dirs *is set to a &g

Re: No space left on device exception

2020-08-20 Thread Vishwas Siravara
to change the staging directory for s3 checkpoints ? Best, Vishwas On Thu, Aug 20, 2020 at 10:27 AM Vishwas Siravara wrote: > Hi Piotr, > Thank you for your suggestion. I will try that, are the temporary files > created in the directory set in *io.tmp.dirs* in the flink-conf.yaml

Re: No space left on device exception

2020-08-20 Thread Vishwas Siravara
; Piotrek > > czw., 20 sie 2020 o 00:56 Vishwas Siravara > napisał(a): > >> Hi guys, >> I have a deduplication job that runs on flink 1.7, that has some state >> which uses FsState backend. My TM heap size is 16 GB. I see the below error >> while trying to che

No space left on device exception

2020-08-19 Thread Vishwas Siravara
Hi guys, I have a deduplication job that runs on flink 1.7, that has some state which uses FsState backend. My TM heap size is 16 GB. I see the below error while trying to checkpoint a state of size 2GB. There is enough space available in s3, I tried to upload larger files and they were all

Re: Non parallel file sources

2020-06-23 Thread Vishwas Siravara
Thanks that makes sense. On Tue, Jun 23, 2020 at 2:13 PM Laurent Exsteens < laurent.exste...@euranova.eu> wrote: > Hi Nick, > > On a project I worked on, we simply made the file accessible on a shared > NFS drive. > Our source was custom, and we forced it to parallelism 1 inside the job, > so

Re: Providing hdfs name node IP for streaming file sink

2020-03-03 Thread Vishwas Siravara
Thanks Yang. Going with setting the HADOOP_CONF_DIR in the flink application. It integrates neatly with flink. Best, Nick. On Mon, Mar 2, 2020 at 7:42 PM Yang Wang wrote: > It may work. However, you need to set your own retry policy(similar as > `ConfiguredFailoverProxyProvider` in hadoop). >

Re: Exactly once semantics for hdfs sink

2020-02-11 Thread Vishwas Siravara
File Sink does support exactly-once semantics and can be > used with HDFS. > > Regards, > Roman > > > On Mon, Feb 10, 2020 at 5:20 PM Vishwas Siravara > wrote: > >> Hi all, >> I want to use the StreamingFile sink for writing data to hdfs. Can I >> achieve exactly once semantics with this sink ? >> >> >> Best, >> HW. >> >

Exactly once semantics for hdfs sink

2020-02-10 Thread Vishwas Siravara
Hi all, I want to use the StreamingFile sink for writing data to hdfs. Can I achieve exactly once semantics with this sink ? Best, HW.

Unit testing filter function in flink

2019-12-19 Thread Vishwas Siravara
Hi guys, I want to test a function like : private[flink] def filterStream(dataStream: DataStream[GenericRecord]): DataStream[GenericRecord] = { dataStream.filter(new FilterFunction[GenericRecord] { override def filter(value: GenericRecord): Boolean = { if (value == null ||

flink's hard dependency on zookeeper for HA

2019-11-06 Thread Vishwas Siravara
Hi all, I am using flink 1.7.2 as a standalone cluster in high availability mode with zookeeper. I have noticed that all flink processes go down once zookeeper goes down ? Is this expected behavior since the leader election has already happened and the job has been running for several hours.

Partitioning based on key flink kafka sink

2019-11-05 Thread Vishwas Siravara
Hi all, I am using flink 1.7.0 and using this constructor FlinkKafkaProducer(String topicId, KeyedSerializationSchema serializationSchema, Properties producerConfig) >From the doc it says this constructor uses fixed partitioner. I want to partition based on key , so I tried to use this public

Re ordering events with flink

2019-11-01 Thread Vishwas Siravara
Hi guys, I want to know if it's possible to sort events in a flink data stream. I know I can't sort a stream but is there a way in which I can buffer for a very short time and sort those events before sending it to a data sink. In our scenario we consume from a kafka topic which has multiple

Re: Consumer group and flink

2019-10-16 Thread Vishwas Siravara
Please ignore this email. On Wed, Oct 16, 2019 at 1:40 PM Vishwas Siravara wrote: > Hi guys, > Is it necessary to specify a consumer group name for a kafka streaming job > when checkpointing is enabled? Since the offsets are not stored in kafka > how does specifying a consume

Consumer group and flink

2019-10-16 Thread Vishwas Siravara
Hi guys, Is it necessary to specify a consumer group name for a kafka streaming job when checkpointing is enabled? Since the offsets are not stored in kafka how does specifying a consumer group help ? Best, Vishwas

Re: Flink restoring a job from a checkpoint

2019-10-09 Thread Vishwas Siravara
checkpoint > [2] > https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#resuming-from-savepoints > > Best, > Congxian > > > Vishwas Siravara 于2019年10月9日周三 上午5:07写道: > >> Hi Yun, >> Thanks for your reply. I do start from GROUP_OFFS

Flink restoring a job from a checkpoint

2019-10-08 Thread Vishwas Siravara
Hi guys, I have a flink streaming job which streams from a kafka source. There is no state in the job, just a simple filter , map and write to a kafka sink. Suppose I stop my job and then submit the job again to the cluster with the same consumer group, will the job restore automatically from the

Increasing number of task slots in the task manager

2019-10-01 Thread Vishwas Siravara
Hi guys, I get java heap space error when I have 1 GB of TM memory and 4 slots(we have 4 cores in our lower environment) per TM , each slot has 1/4GB of managed memory. >From the flink doc https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#task-slots-and-resources I

High availability flink job

2019-09-15 Thread Vishwas Siravara
Hi guys, I have a flink job running in standalone mode with a parallelism of >1 , that produces data to a kafka sink. My topic is replicated with a replication factor of 2. Now suppose one of the kafka brokers goes down , then will my streaming job fail ? Is there a way where in I can continue

Flink kafka producer partitioning scheme

2019-09-13 Thread Vishwas Siravara
Hi guys, >From the flink doc *By default, if a custom partitioner is not specified for the Flink Kafka Producer, the producer will use a FlinkFixedPartitioner that maps each Flink Kafka Producer parallel subtask to a single Kafka partition (i.e., all records received by a sink subtask will end up

externalizing config flies for flink class loader

2019-09-12 Thread Vishwas Siravara
I have a standalone cluster. I have added my own library(jar file) to the lib/ folder in flink . I submit my job from cli after I start the cluster. Now I want to externalize a property file which has to be read by this library. Since this library is loaded by flink's classloader and not the

Using FlinkKafkaConsumer API

2019-09-09 Thread Vishwas Siravara
I am using flink-kafka-connector and this is my dependency "org.apache.flink" %% "flink-connector-kafka" % *"1.7.0"*, Whe I look at my dependency tree the kafka client version is -org.apache.kafka:kafka-clients:2.0.1 which comes from the above package. However when I run my code in the

Re: understanding task manager logs

2019-09-06 Thread Vishwas Siravara
g/apache/kafka/common/utils/AppInfoParser.java#L117 > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#kafka-100-connector > > Am Di., 3. Sept. 2019 um 04:04 Uhr schrieb Vishwas Siravara < > vsirav...@gmail.com>: > >>

Flink Kafka Connector

2019-09-05 Thread Vishwas Siravara
Hi guys, I am using flink connector for kakfa from 1.9.0 Her is my sbt dependency : "org.apache.flink" %% "flink-connector-kafka" % "1.9.0", When I check the log file I see that the kafka version is 0.10.2.0. According to the docs it says that 1.9.0 onwards the version should be 2.2.0. Why do

understanding task manager logs

2019-09-02 Thread Vishwas Siravara
Hi guys, I am using flink 1.7.2 and my application consumes from a kafka topic and publish to another kafka topic which is in its own kafka environment running a different kafka version,. I am using FlinkKafkaConsumer010 from this dependency *"org.apache.flink" %% "flink-connector-kafka-0.10" %

Re: Flink and kerberos

2019-08-29 Thread Vishwas Siravara
rity.plain.PlainLoginModule > required username=\"myuser\" password=\"\";" > sasl.mechanism=PLAIN > security.protocol=SASL_SSL > > Le jeu. 29 août 2019 à 18:20, Vishwas Siravara a > écrit : > >> Hey David , >> My consumers are reg

Re: Flink and kerberos

2019-08-29 Thread Vishwas Siravara
sumption. > > Le jeu. 29 août 2019 à 09:57, Vishwas Siravara a > écrit : > >> I see this log as well , but I can't see any messages . I know for a fact >> that the topic I am subscribed to has messages as I c

Re: Flink and kerberos

2019-08-29 Thread Vishwas Siravara
}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}] On Thu, Aug 29, 2019 at 2:02 AM Vishwas Siravara wrote: > Hi guys, > I am using kerberos for my kafka sou

Flink and kerberos

2019-08-29 Thread Vishwas Siravara
Hi guys, I am using kerberos for my kafka source. I pass the jaas config and krb5.conf in the env.java.opts: -Dconfig.resource=qa.conf -Djava.library.path=/usr/mware/SimpleAPI/voltage-simple-api-java-05.12.-Linux-x86_64-64b-r234867/lib/

Re: Loading dylibs

2019-08-28 Thread Vishwas Siravara
s a known issue in the Flink Jira project [1]. > Is it possible that you have encountered the same problem? > > [1]: https://issues.apache.org/jira/browse/FLINK-11402 > > Regards, > Aleksey > > > On Tue, Aug 27, 2019 at 8:03 AM Vishwas Siravara > wrote: > >&

Re: Loading dylibs

2019-08-27 Thread Vishwas Siravara
s? Thanks, Vishwas On Tue, Aug 27, 2019 at 12:25 AM Jörn Franke wrote: > I don’t know Dylibs in detail, but can you call a static method where it > checks if it has been already executed and if not then it loads the library > (Singleton pattern)? > > Am 27.08.2019 um 06:39 schrieb

Loading dylibs

2019-08-26 Thread Vishwas Siravara
Hi guys, I have a flink application that loads a dylib like this System.loadLibrary("vibesimplejava"); The application runs fine , when I restart the job I get this exception : com.visa.aip.cryptolib.aipcyptoclient.EncryptionException: Unexpected errorjava.lang.UnsatisfiedLinkError: Native

Re: Using shell environment variables

2019-08-25 Thread Vishwas Siravara
You can also link at runtime by providing the path to the dylib by adding -Djava.library.path= in jvm options in the task manager On Sat, Aug 24, 2019 at 9:11 PM Zhu Zhu wrote: > Hi Abhishek, > > You need to export the environment variables on all the worker > machines(not the machine to submit

Re: Use logback instead of log4j

2019-08-25 Thread Vishwas Siravara
Any idea on how I can use log back instead ? On Fri, Aug 23, 2019 at 1:22 PM Vishwas Siravara wrote: > Hi , > From the flink doc , in order to use logback instead of log4j " Users > willing to use logback instead of log4j can just exclude log4j (or delete > it from the lib/

Re: Externalized checkpoints

2019-08-25 Thread Vishwas Siravara
tml#retained-checkpoints > Best, > Congxian > > > Zhu Zhu 于2019年8月22日周四 上午10:13写道: > >> Hi Vishwas, >> >> You can configure "state.checkpoints.num-retained" to specify the max >> checkpoints to retain. >> By default it is 1. >>

Use logback instead of log4j

2019-08-23 Thread Vishwas Siravara
Hi , >From the flink doc , in order to use logback instead of log4j " Users willing to use logback instead of log4j can just exclude log4j (or delete it from the lib/ folder)." https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/logging.html . However when i delete it from the lib

Re: Externalized checkpoints

2019-08-21 Thread Vishwas Siravara
I am also using exactly once checkpointing mode, I have a kafka source and sink so both support transactions which should allow for exactly once processing. Is this the reason why there is only one checkpoint retained ? Thanks, Vishwas On Wed, Aug 21, 2019 at 5:26 PM Vishwas Siravara wrote

Externalized checkpoints

2019-08-21 Thread Vishwas Siravara
Hi peeps, I am externalizing checkpoints in S3 for my flink job and I retain them on cancellation. However when I look into my S3 bucket where the checkpoints are stored there is only 1 checkpoint at any point in time . Is this the default behavior of flink where older checkpoints are deleted when

Flink logback

2019-08-21 Thread Vishwas Siravara
Hi all, I modified the logback.xml provided by flink distribution, so now the logback.xml file looks like this : *${log.file} false %d{-MM-dd HH:mm:ss.SSS} [%thread]

Configuring logback

2019-08-20 Thread Vishwas Siravara
Hi guys, I am using logback for my application logs. I have logback.xml as a part of my fat jar that I submit to flink via command line flink run "...". When I run my application from IDE , the appenders are what I have set in my logback but when I run from command line the appender defaults to

Re: Configuring logback for my flink job

2019-08-20 Thread Vishwas Siravara
ink-dist/src/main/flink-bin/bin/flink#L52 > > Vishwas Siravara 于2019年8月19日周一 下午11:02写道: > >> Hi, >> I have a logback for my flink application which is packaged with the >> application fat jar. However when I submit my job from flink command line >> tool, I see that

Configuring logback for my flink job

2019-08-19 Thread Vishwas Siravara
Hi, I have a logback for my flink application which is packaged with the application fat jar. However when I submit my job from flink command line tool, I see that logback is set to -Dlogback.configurationFile=file:/data/flink-1.7.2/conf/logback.xml from the client log. As a result my application

Re: Understanding job flow

2019-08-16 Thread Vishwas Siravara
different nodes, it is initialized each time on all the 3 nodes(taskmanager). I wonder why this happens. Thanks, Vishwas On Thu, Aug 15, 2019 at 11:42 AM Steven Nelson wrote: > @transient or use a static factory. > > In Scala we use a @transient lazy val with an initializer to do this &g

Understanding job flow

2019-08-15 Thread Vishwas Siravara
Hi guys, I have a map job where I want to encrypt certain keys . I initialize the encryptor in the main method and apply it in the map function. How is this encryptor shared when I have my job running on multiple task managers with parallelism > 1 ? Thanks, Vishwas

Flink job parallelism

2019-08-15 Thread Vishwas Siravara
Hi guys, I have a flink job which I want to run with a parallelism of 2. I run it from command line like : flink run -p 2 -C file:///home/was/classpathconfig/ -c com.visa.flink.cli.Main flink-job-assembly-0.1-SNAPSHOT.jar flink druid My cluster has two task managers with only 1 task slot each.

Re: How can I pass multiple java options in standalone mode ?

2019-08-15 Thread Vishwas Siravara
do you want to do. > > > Best, > Yang > > Vishwas Siravara 于2019年8月15日周四 上午3:39写道: > >> Thanks a lot, I fixed that, so now this works when I submit my job with >> the flink UI but when I submit it via flink run(command line) it does not >> take this env.java.

External classpath

2019-08-14 Thread Vishwas Siravara
Hi guys, I m very close to deploying my application in production so I am trying to externalize some of the config files which has to be available on the classpath when I run my application via flink command line interface. >From the flink doc I can add to class path by -C,--classpath

How can I pass jvm options to flink when started from command line

2019-08-14 Thread Vishwas Siravara
I understand that when I run a flink job from command line it forks a jvm and runs the main method and the flink related code run in the task manager. So when I say "flink run " the main does not run on JobManager hence it does not take env.java.options set in the flink-conf.yaml as this applies

Re: How can I pass multiple java options in standalone mode ?

2019-08-14 Thread Vishwas Siravara
manager. Thanks, Vishwas On Wed, Aug 14, 2019 at 2:35 PM Aleksandar Mastilovic < amastilo...@sightmachine.com> wrote: > It’s a YAML file, so I think you need to do something like > > env.java.opts: -Dconfig.resource=qa.conf > > On Aug 14, 2019, at 11:58 AM, Vishwas Siravara

How can I pass multiple java options in standalone mode ?

2019-08-12 Thread Vishwas Siravara
Hi guys, I have this entry in flink-conf.yaml file for jvm options. env.java.opts: "-Djava.security.auth.login.config={{ flink_installed_dir }}/kafka-jaas.conf,-Djava.security.krb5.conf={{ flink_installed_dir }}/krb5.conf" Is this supposed to be a , separated list ? I get a parse exception when

Passing jvm options to flink

2019-08-07 Thread Vishwas Siravara
Hi , I am running flink on a standalone cluster without any resource manager like yarn or K8s. I am submitting my job using command line "*f**link run ..." . *I have a couple of questions: *1. *How can I pass JVM parameters to this job. I want to pass a parameter for a dylib like this

Streaming from a file

2019-08-01 Thread Vishwas Siravara
Hi guys, Is it possible for flink to stream from a unix file system line by line, when I use readTextFile(path) - Reads text files, i.e. files that respect the TextInputFormat specification, line-by-line and returns them as Strings. The entire contents of the file comes as a datastream, over which

Re: S3 checkpointing exception

2019-07-20 Thread Vishwas Siravara
I found the solution to this problem , it was a dependency issue, I had to exclude "xml-apis" to get this fixed. Also the s3-presto jar provides better error messages which was helpful. Thanks, Vishwas On Thu, Jul 18, 2019 at 8:14 PM Vishwas Siravara wrote: > I am using ec

S3 checkpointing exception

2019-07-18 Thread Vishwas Siravara
I am using ecs S3 instance to checkpoint, I use the following configuration. s3.access-key vdna_np_user s3.endpoint https://SU73ECSG**COM:9021 s3.secret-key **I set the checkpoint in the code like env.setStateBackend(*new *FsStateBackend("s3://vishwas.test1/checkpoints")) I have a

Flink s3 wire log

2019-07-18 Thread Vishwas Siravara
Here is my wire log while trying to checkpoint to ecs S3. I see the request got a 404 , does this mean that it can't find the folder *checkpoints . *Since s3 does not have folders, what should I put there ? Thanks so much for all the help that you guys have provided so far. Really appreciate it.

Re: Providing external files to flink classpath

2019-07-17 Thread Vishwas Siravara
nk-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java#L120 > > Best > Yun Tang > > -- > *From:* Vishwas Siravara > *Sent:* Saturday, June 29, 2019 0:43 > *To:* user > *Subject:* Providing external files to flink classpa

Questions about user doc.

2019-07-16 Thread Vishwas Siravara
Hey guys, In this document : https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html , there is a line in the beginning of the scheduling section which says that : "A pipeline consists of multiple successive tasks, such as the *n-th* parallel instance of a MapFunction

Unable to start task manager in debug mode

2019-07-08 Thread Vishwas Siravara
Hi guys, I am not able to start a stand alone session with one task manager and one job manager on the same node by adding debug option in flink-conf.yaml as env.java.opts: -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005(

Providing external files to flink classpath

2019-06-28 Thread Vishwas Siravara
Hi , I am trying to add external property files to the flink classpath for my application. These files are not a part of the fat jar. I put them under the lib folder but flink cant find them? How can I manage external property files that needs to be read by flink ? Thanks, Vishwas

Error checkpointing to S3 like FS (EMC ECS)

2019-06-24 Thread Vishwas Siravara
Hi, I am using flink version 1.7.2 , I am trying to use S3 like object storage EMC ECS( https://www.emc.com/techpubs/ecs/ecs_s3_supported_features-1.htm). Not all S3 apis are supported by EMC ESC according to this document. Here is my config s3.endpoint: SU73ECSG1P1d.***.COM s3.access-key:

Re: Unable to set S3 like object storage for state backend.

2019-06-24 Thread Vishwas Siravara
s bucket names to contain an underscore. > > I’m guessing that the Hadoop S3 code is trying to treat your path as a valid > URI, but the bucket name doesn’t conform, and thus you get the "null uri > host” issue. > > Could you try with a compliant bucket name? > > — Ken &g

Unable to set S3 like object storage for state backend.

2019-06-20 Thread Vishwas Siravara
Hi, I am using flink version 1.7.2 , I am trying to use S3 like object storage EMC ECS( https://www.emc.com/techpubs/ecs/ecs_s3_supported_features-1.htm) . I am using the flink-s3-fs-hadoop-1.7.2.jar file as a dependency for s3 filesystem and I have placed it under the lib folder and is available