tate requirements.
> All tasks in the same task manager share the JVM heap as the task manager
> runs one JVM system process on the machine where it is deployed to.
>
> Best,
> Andrey
>
> On Wed, Aug 26, 2020 at 6:52 PM Vishwas Siravara
> wrote:
>
>> Hi Andrey,
>&
Hi guys,
I use flink version 1.7.2
I have a stateful streaming job which uses a keyed process function. I use
heap state backend. Although I set TM heap size to 16 GB, I get OOM error
when the state size is around 2.5 GB(from dashboard I get the state size).
I have set taskmanager.memory.fraction:
ing this to your flink-conf.yaml?
>
> s3.staging-directory: /usr/mware/flink/tmp
>
> On 20/08/2020 20:50, Vishwas Siravara wrote:
>
> Hi Piotr,
> I did some analysis and realised that the temp files for s3
> checkpoints are staged in /tmp although the *io.tmp.dirs *is set to a
&g
to change the staging directory
for s3 checkpoints ?
Best,
Vishwas
On Thu, Aug 20, 2020 at 10:27 AM Vishwas Siravara
wrote:
> Hi Piotr,
> Thank you for your suggestion. I will try that, are the temporary files
> created in the directory set in *io.tmp.dirs* in the flink-conf.yaml
; Piotrek
>
> czw., 20 sie 2020 o 00:56 Vishwas Siravara
> napisał(a):
>
>> Hi guys,
>> I have a deduplication job that runs on flink 1.7, that has some state
>> which uses FsState backend. My TM heap size is 16 GB. I see the below error
>> while trying to che
Hi guys,
I have a deduplication job that runs on flink 1.7, that has some state
which uses FsState backend. My TM heap size is 16 GB. I see the below error
while trying to checkpoint a state of size 2GB. There is enough space
available in s3, I tried to upload larger files and they were all
Thanks that makes sense.
On Tue, Jun 23, 2020 at 2:13 PM Laurent Exsteens <
laurent.exste...@euranova.eu> wrote:
> Hi Nick,
>
> On a project I worked on, we simply made the file accessible on a shared
> NFS drive.
> Our source was custom, and we forced it to parallelism 1 inside the job,
> so
Thanks Yang. Going with setting the HADOOP_CONF_DIR in the flink
application. It integrates neatly with flink.
Best,
Nick.
On Mon, Mar 2, 2020 at 7:42 PM Yang Wang wrote:
> It may work. However, you need to set your own retry policy(similar as
> `ConfiguredFailoverProxyProvider` in hadoop).
>
File Sink does support exactly-once semantics and can be
> used with HDFS.
>
> Regards,
> Roman
>
>
> On Mon, Feb 10, 2020 at 5:20 PM Vishwas Siravara
> wrote:
>
>> Hi all,
>> I want to use the StreamingFile sink for writing data to hdfs. Can I
>> achieve exactly once semantics with this sink ?
>>
>>
>> Best,
>> HW.
>>
>
Hi all,
I want to use the StreamingFile sink for writing data to hdfs. Can I
achieve exactly once semantics with this sink ?
Best,
HW.
Hi guys,
I want to test a function like :
private[flink] def filterStream(dataStream:
DataStream[GenericRecord]): DataStream[GenericRecord] = {
dataStream.filter(new FilterFunction[GenericRecord] {
override def filter(value: GenericRecord): Boolean = {
if (value == null ||
Hi all,
I am using flink 1.7.2 as a standalone cluster in high availability mode
with zookeeper. I have noticed that all flink processes go down once
zookeeper goes down ? Is this expected behavior since the leader election
has already happened and the job has been running for several hours.
Hi all,
I am using flink 1.7.0 and using this constructor
FlinkKafkaProducer(String topicId, KeyedSerializationSchema
serializationSchema, Properties producerConfig)
>From the doc it says this constructor uses fixed partitioner. I want to
partition based on key , so I tried to use this
public
Hi guys,
I want to know if it's possible to sort events in a flink data stream. I
know I can't sort a stream but is there a way in which I can buffer for a
very short time and sort those events before sending it to a data sink.
In our scenario we consume from a kafka topic which has multiple
Please ignore this email.
On Wed, Oct 16, 2019 at 1:40 PM Vishwas Siravara
wrote:
> Hi guys,
> Is it necessary to specify a consumer group name for a kafka streaming job
> when checkpointing is enabled? Since the offsets are not stored in kafka
> how does specifying a consume
Hi guys,
Is it necessary to specify a consumer group name for a kafka streaming job
when checkpointing is enabled? Since the offsets are not stored in kafka
how does specifying a consumer group help ?
Best,
Vishwas
checkpoint
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#resuming-from-savepoints
>
> Best,
> Congxian
>
>
> Vishwas Siravara 于2019年10月9日周三 上午5:07写道:
>
>> Hi Yun,
>> Thanks for your reply. I do start from GROUP_OFFS
Hi guys,
I have a flink streaming job which streams from a kafka source. There is no
state in the job, just a simple filter , map and write to a kafka sink.
Suppose I stop my job and then submit the job again to the cluster with the
same consumer group, will the job restore automatically from the
Hi guys,
I get java heap space error when I have 1 GB of TM memory and 4 slots(we
have 4 cores in our lower environment) per TM , each slot has 1/4GB of
managed memory.
>From the flink doc
https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#task-slots-and-resources
I
Hi guys,
I have a flink job running in standalone mode with a parallelism of >1 ,
that produces data to a kafka sink. My topic is replicated with a
replication factor of 2. Now suppose one of the kafka brokers goes down ,
then will my streaming job fail ? Is there a way where in I can continue
Hi guys,
>From the flink doc
*By default, if a custom partitioner is not specified for the Flink Kafka
Producer, the producer will use a FlinkFixedPartitioner that maps each
Flink Kafka Producer parallel subtask to a single Kafka partition (i.e.,
all records received by a sink subtask will end up
I have a standalone cluster. I have added my own library(jar file) to the
lib/ folder in flink . I submit my job from cli after I start the cluster.
Now I want to externalize a property file which has to be read by this
library. Since this library is loaded by flink's classloader and not the
I am using flink-kafka-connector and this is my dependency
"org.apache.flink" %% "flink-connector-kafka" % *"1.7.0"*,
Whe I look at my dependency tree the kafka client version is
-org.apache.kafka:kafka-clients:2.0.1 which comes from the above package.
However when I run my code in the
g/apache/kafka/common/utils/AppInfoParser.java#L117
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#kafka-100-connector
>
> Am Di., 3. Sept. 2019 um 04:04 Uhr schrieb Vishwas Siravara <
> vsirav...@gmail.com>:
>
>>
Hi guys,
I am using flink connector for kakfa from 1.9.0
Her is my sbt dependency :
"org.apache.flink" %% "flink-connector-kafka" % "1.9.0",
When I check the log file I see that the kafka version is 0.10.2.0.
According to the docs it says that 1.9.0 onwards the version should be
2.2.0. Why do
Hi guys,
I am using flink 1.7.2 and my application consumes from a kafka topic and
publish to another kafka topic which is in its own kafka environment
running a different kafka version,. I am using FlinkKafkaConsumer010 from
this dependency
*"org.apache.flink" %% "flink-connector-kafka-0.10" %
rity.plain.PlainLoginModule
> required username=\"myuser\" password=\"\";"
> sasl.mechanism=PLAIN
> security.protocol=SASL_SSL
>
> Le jeu. 29 août 2019 à 18:20, Vishwas Siravara a
> écrit :
>
>> Hey David ,
>> My consumers are reg
sumption.
>
> Le jeu. 29 août 2019 à 09:57, Vishwas Siravara a
> écrit :
>
>> I see this log as well , but I can't see any messages . I know for a fact
>> that the topic I am subscribed to has messages as I c
},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}]
On Thu, Aug 29, 2019 at 2:02 AM Vishwas Siravara
wrote:
> Hi guys,
> I am using kerberos for my kafka sou
Hi guys,
I am using kerberos for my kafka source. I pass the jaas config and
krb5.conf in the env.java.opts: -Dconfig.resource=qa.conf
-Djava.library.path=/usr/mware/SimpleAPI/voltage-simple-api-java-05.12.-Linux-x86_64-64b-r234867/lib/
s a known issue in the Flink Jira project [1].
> Is it possible that you have encountered the same problem?
>
> [1]: https://issues.apache.org/jira/browse/FLINK-11402
>
> Regards,
> Aleksey
>
>
> On Tue, Aug 27, 2019 at 8:03 AM Vishwas Siravara
> wrote:
>
>&
s?
Thanks,
Vishwas
On Tue, Aug 27, 2019 at 12:25 AM Jörn Franke wrote:
> I don’t know Dylibs in detail, but can you call a static method where it
> checks if it has been already executed and if not then it loads the library
> (Singleton pattern)?
>
> Am 27.08.2019 um 06:39 schrieb
Hi guys,
I have a flink application that loads a dylib like this
System.loadLibrary("vibesimplejava");
The application runs fine , when I restart the job I get this exception :
com.visa.aip.cryptolib.aipcyptoclient.EncryptionException: Unexpected
errorjava.lang.UnsatisfiedLinkError: Native
You can also link at runtime by providing the path to the dylib by adding
-Djava.library.path= in jvm options in the task manager
On Sat, Aug 24, 2019 at 9:11 PM Zhu Zhu wrote:
> Hi Abhishek,
>
> You need to export the environment variables on all the worker
> machines(not the machine to submit
Any idea on how I can use log back instead ?
On Fri, Aug 23, 2019 at 1:22 PM Vishwas Siravara
wrote:
> Hi ,
> From the flink doc , in order to use logback instead of log4j " Users
> willing to use logback instead of log4j can just exclude log4j (or delete
> it from the lib/
tml#retained-checkpoints
> Best,
> Congxian
>
>
> Zhu Zhu 于2019年8月22日周四 上午10:13写道:
>
>> Hi Vishwas,
>>
>> You can configure "state.checkpoints.num-retained" to specify the max
>> checkpoints to retain.
>> By default it is 1.
>>
Hi ,
>From the flink doc , in order to use logback instead of log4j " Users
willing to use logback instead of log4j can just exclude log4j (or delete
it from the lib/ folder)."
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/logging.html
.
However when i delete it from the lib
I am also using exactly once checkpointing mode, I have a kafka source and
sink so both support transactions which should allow for exactly once
processing. Is this the reason why there is only one checkpoint retained ?
Thanks,
Vishwas
On Wed, Aug 21, 2019 at 5:26 PM Vishwas Siravara
wrote
Hi peeps,
I am externalizing checkpoints in S3 for my flink job and I retain them on
cancellation. However when I look into my S3 bucket where the checkpoints
are stored there is only 1 checkpoint at any point in time . Is this the
default behavior of flink where older checkpoints are deleted when
Hi all,
I modified the logback.xml provided by flink distribution, so now the
logback.xml file looks like this :
*${log.file}
false
%d{-MM-dd HH:mm:ss.SSS} [%thread]
Hi guys,
I am using logback for my application logs. I have logback.xml as a part of
my fat jar that I submit to flink via command line flink run "...". When I
run my application from IDE , the appenders are what I have set in my
logback but when I run from command line the appender defaults to
ink-dist/src/main/flink-bin/bin/flink#L52
>
> Vishwas Siravara 于2019年8月19日周一 下午11:02写道:
>
>> Hi,
>> I have a logback for my flink application which is packaged with the
>> application fat jar. However when I submit my job from flink command line
>> tool, I see that
Hi,
I have a logback for my flink application which is packaged with the
application fat jar. However when I submit my job from flink command line
tool, I see that logback is set to
-Dlogback.configurationFile=file:/data/flink-1.7.2/conf/logback.xml from
the client log.
As a result my application
different nodes,
it is initialized each time on all the 3 nodes(taskmanager). I wonder why
this happens.
Thanks,
Vishwas
On Thu, Aug 15, 2019 at 11:42 AM Steven Nelson
wrote:
> @transient or use a static factory.
>
> In Scala we use a @transient lazy val with an initializer to do this
&g
Hi guys,
I have a map job where I want to encrypt certain keys . I initialize the
encryptor in the main method and apply it in the map function. How is this
encryptor shared when I have my job running on multiple task managers with
parallelism > 1 ?
Thanks,
Vishwas
Hi guys,
I have a flink job which I want to run with a parallelism of 2.
I run it from command line like : flink run -p 2 -C
file:///home/was/classpathconfig/ -c com.visa.flink.cli.Main
flink-job-assembly-0.1-SNAPSHOT.jar flink druid
My cluster has two task managers with only 1 task slot each.
do you want to do.
>
>
> Best,
> Yang
>
> Vishwas Siravara 于2019年8月15日周四 上午3:39写道:
>
>> Thanks a lot, I fixed that, so now this works when I submit my job with
>> the flink UI but when I submit it via flink run(command line) it does not
>> take this env.java.
Hi guys,
I m very close to deploying my application in production so I am trying to
externalize some of the config files which has to be available on the
classpath when I run my application via flink command line interface.
>From the flink doc I can add to class path by
-C,--classpath
I understand that when I run a flink job from command line it forks a jvm
and runs the main method and the flink related code run in the task
manager. So when I say "flink run " the main does not run on JobManager
hence it does not take env.java.options set in the flink-conf.yaml as this
applies
manager.
Thanks,
Vishwas
On Wed, Aug 14, 2019 at 2:35 PM Aleksandar Mastilovic <
amastilo...@sightmachine.com> wrote:
> It’s a YAML file, so I think you need to do something like
>
> env.java.opts: -Dconfig.resource=qa.conf
>
> On Aug 14, 2019, at 11:58 AM, Vishwas Siravara
Hi guys,
I have this entry in flink-conf.yaml file for jvm options.
env.java.opts: "-Djava.security.auth.login.config={{ flink_installed_dir
}}/kafka-jaas.conf,-Djava.security.krb5.conf={{ flink_installed_dir
}}/krb5.conf"
Is this supposed to be a , separated list ? I get a parse exception when
Hi ,
I am running flink on a standalone cluster without any resource manager
like yarn or K8s. I am submitting my job using command line "*f**link run
..." . *I have a couple of questions:
*1. *How can I pass JVM parameters to this job. I want to pass a parameter
for a dylib like this
Hi guys,
Is it possible for flink to stream from a unix file system line by line,
when I use readTextFile(path) - Reads text files, i.e. files that respect
the TextInputFormat specification, line-by-line and returns them as Strings.
The entire contents of the file comes as a datastream, over which
I found the solution to this problem , it was a dependency issue, I had to
exclude "xml-apis" to get this fixed. Also the s3-presto jar provides
better error messages which was helpful.
Thanks,
Vishwas
On Thu, Jul 18, 2019 at 8:14 PM Vishwas Siravara
wrote:
> I am using ec
I am using ecs S3 instance to checkpoint, I use the following
configuration.
s3.access-key vdna_np_user
s3.endpoint https://SU73ECSG**COM:9021
s3.secret-key **I set the checkpoint in the code like
env.setStateBackend(*new *FsStateBackend("s3://vishwas.test1/checkpoints"))
I have a
Here is my wire log while trying to checkpoint to ecs S3. I see the request
got a 404 , does this mean that it can't find the folder *checkpoints . *Since
s3 does not have folders, what should I put there ? Thanks so much for all
the help that you guys have provided so far. Really appreciate it.
nk-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java#L120
>
> Best
> Yun Tang
>
> --
> *From:* Vishwas Siravara
> *Sent:* Saturday, June 29, 2019 0:43
> *To:* user
> *Subject:* Providing external files to flink classpa
Hey guys,
In this document :
https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html
,
there is a line in the beginning of the scheduling section which says that
: "A pipeline consists of multiple successive tasks, such as the
*n-th* parallel
instance of a MapFunction
Hi guys,
I am not able to start a stand alone session with one task manager and one
job manager on the same node by adding debug option in flink-conf.yaml
as env.java.opts:
-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005(
Hi ,
I am trying to add external property files to the flink classpath for
my application. These files are not a part of the fat jar. I put them
under the lib folder but flink cant find them? How can I manage
external property files that needs to be read by flink ?
Thanks,
Vishwas
Hi,
I am using flink version 1.7.2 , I am trying to use S3 like object
storage EMC ECS(
https://www.emc.com/techpubs/ecs/ecs_s3_supported_features-1.htm). Not
all S3 apis are supported by EMC ESC according to this document. Here
is my config
s3.endpoint: SU73ECSG1P1d.***.COM
s3.access-key:
s bucket names to contain an underscore.
>
> I’m guessing that the Hadoop S3 code is trying to treat your path as a valid
> URI, but the bucket name doesn’t conform, and thus you get the "null uri
> host” issue.
>
> Could you try with a compliant bucket name?
>
> — Ken
&g
Hi,
I am using flink version 1.7.2 , I am trying to use S3 like object
storage EMC ECS(
https://www.emc.com/techpubs/ecs/ecs_s3_supported_features-1.htm) .
I am using the flink-s3-fs-hadoop-1.7.2.jar file as a dependency for
s3 filesystem and I have placed it under the lib folder and is
available
63 matches
Mail list logo