Flink InfluxDb connector not present in Maven

2021-04-17 Thread Vinay Patil
Hi Team,

Flink influx db connector `flink-connector-influxdb_2.1` is not present in
Maven , can you please upload the same
https://repo.maven.apache.org/maven2/org/apache/bahir/

Regards,
Vinay Patil


Re: Flink Deployment on Kubernetes session Cluster

2020-07-29 Thread Vinay Patil
Hi Yang,

Thank you for your reply.

Yes, we have evaluated job specific clusters (as we used to deploy the same
in YARN) , the main issue is Job monitoring of multiple jobs as we won't be
having a single endpoint like YARN does . We will evaluate K8's operator
you have suggested


Thanks and Regards,
Vinay Patil


On Wed, Jul 29, 2020 at 11:08 AM Yang Wang  wrote:

> Hi Vinay Patil,
>
> You are right. Flink does not provide any isolation between different jobs
> in the same Flink session cluster.
> You could use Flink job cluster or application cluster(from 1.11) to get
> better isolation since a dedicated Flink
> cluster will be started for each job.
>
> Please refer to the standalone K8s job cluster[1] or native K8s
> application mode[2] for more information.
>
> If you want to get a tool for managing multiple jobs, maybe
> flink-k8s-operator is a good choice[3][4].
> Also I am trying to build a java implemented flink-native-k8s-operator[5],
> please checkout if you are interested.
>
> [1].
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html#deploy-job-cluster
> [2].
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#flink-kubernetes-application
> [3]. https://github.com/lyft/flinkk8soperator
> [4]. https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
> [5]. https://github.com/wangyang0918/flink-native-k8s-operator
>
>
> Best,
> Yang
>
> Vinay Patil  于2020年7月29日周三 上午12:15写道:
>
>> Hi Team,
>>
>> We have a session cluster running on K8 where multiple stateless jobs are
>> running fine. We observed that once we submit a stateful job (state size
>> per checkpoint is 1GB) to the same session cluster other jobs are impacted
>> because this job starts to utilise more memory and CPU and eventually
>> terminates the pod.
>>
>> To mitigate this issue and provide better resource isolation we have
>> created multiple session clusters where we will launch a high
>> throughput (stateful) job in one cluster and club low throughput jobs in
>> another cluster.
>> This seems to work fine but managing this will be painful once we start
>> to create more session cluster for high throughput jobs (10 plus jobs) as
>> we will not have a single flink endpoint to submit the job ( as we have it
>> in YARN where we submit directly to RM )
>>
>> Can you please provide me inputs on how we should handle this better in
>> Kubernetes
>>
>>
>>
>> Regards,
>> Vinay Patil
>>
>


Flink Deployment on Kubernetes session Cluster

2020-07-28 Thread Vinay Patil
Hi Team,

We have a session cluster running on K8 where multiple stateless jobs are
running fine. We observed that once we submit a stateful job (state size
per checkpoint is 1GB) to the same session cluster other jobs are impacted
because this job starts to utilise more memory and CPU and eventually
terminates the pod.

To mitigate this issue and provide better resource isolation we have
created multiple session clusters where we will launch a high
throughput (stateful) job in one cluster and club low throughput jobs in
another cluster.
This seems to work fine but managing this will be painful once we start to
create more session cluster for high throughput jobs (10 plus jobs) as we
will not have a single flink endpoint to submit the job ( as we have it in
YARN where we submit directly to RM )

Can you please provide me inputs on how we should handle this better in
Kubernetes



Regards,
Vinay Patil


Re: Timer metric in Flink

2020-06-11 Thread Vinay Patil
Ohh Okay, basically implement the Gauge and add timer functionality to it
for now.

Is there a plan or JIRA ticket to add Timer metric in future release, I
think it is good to have

Regards,
Vinay Patil


On Wed, Jun 10, 2020 at 5:55 PM Chesnay Schepler  wrote:

> You cannot add custom metric types, just implementations of the existing
> ones. Your timer(wrapper) will have to implement Gauge or Histogram.
>
> On 10/06/2020 14:17, Vinay Patil wrote:
>
> Hi,
>
> As timer metric is not provided out of the box, can I create a new
> MetricGroup by implementing this interface and add timer capability, this
> will be similar to Histogram wrapper Flink has provided. If yes, I can
> create a wrapper like
>
> `public TimerWrapper implements Timer` , in this case will also have to
> create Timer interface and add it to the metric group.
>
> Is this possible?
>
> I want to have a timer to check Hbase lookup time.
>
> Regards,
> Vinay Patil
>
>
>


Timer metric in Flink

2020-06-10 Thread Vinay Patil
Hi,

As timer metric is not provided out of the box, can I create a new
MetricGroup by implementing this interface and add timer capability, this
will be similar to Histogram wrapper Flink has provided. If yes, I can
create a wrapper like

`public TimerWrapper implements Timer` , in this case will also have to
create Timer interface and add it to the metric group.

Is this possible?

I want to have a timer to check Hbase lookup time.

Regards,
Vinay Patil


Re: Handling stale data enrichment

2020-04-24 Thread Vinay Patil
Hi Konstantin,

Thank you for your answer.

Yes, we have timestamps in the subscription stream

>the disadvantage that you do not make any progress until you see fresh
subscription data. Is this the desired behavior for your use case?
No, this is not acceptable. Reason being the subscription data might be a
slow changing. Let's say it is not getting updated for 6 hrs. In this case
the click stream event is continuously flowing, we want to enrich it
against the slow moving stream.

In case of event time join/low level joins, I am assuming that the
watermarks will still make progress because of
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#idling-sources.
Or do we still have to handle it in the assigner and emit a watermark if we
are not receiving elements for a while ? (not sure how this will work in
case of low level joins)

I am considering to use low level joins approach using connected streams
where-in I will keep the reference data in state (processElement1) and
click stream event (processElement2) and join this. In this case I will
buffer the elements of click stream events for a configurable period of
time and then delete it. (This is to handle late record).

I think the downstream consumer of enriched data will have to dedup the
duplicate records or else we will end up having stale enrichment.

Regards,
Vinay Patil


On Fri, Apr 24, 2020 at 12:14 PM Konstantin Knauf  wrote:

> Hi Vinay,
>
> I assume your subscription updates also have a timestamp and a watermark.
> Otherwise, there is no way for Flink to tell that the subscription updates
> are late.
>
> If you use a "temporal table "-style join to join the two streams, and you
> do not receive any subscription updates for 2 hours, the watermark will not
> advance (it is the minimum of the two input streams) and hence all click
> events will be buffered. No output. This has the advantage of not sending
> out duplicate records, but the disadvantage that you do not make any
> progress until you see fresh subscription data. Is this the desired
> behavior for your use case?
>
> Best,
>
> Konstantin
>
>
> On Thu, Apr 23, 2020 at 1:29 PM Vinay Patil 
> wrote:
>
>> Hi,
>>
>> I went through Konstantin webinar on 99 ways you can do enrichment. One
>> thing I am failing to understand is how do we efficiently handle stale data
>> enrichment.
>>
>> Context: Let's say I want to enrich user data with the subscription data.
>> Here subscription data is acting as reference data and will be used for
>> joining these two streams based on event time. Consider the following
>> scenario:
>>
>>
>>1. We are going to enrich Click Stream event (containing user_info)
>>with Subscription details
>>2. Subscription Status for Alice user is FREE
>>3. Current Internal State contains Alice with Subscription status as
>>FREE
>>4.
>>
>>Reference data is not flowing because of some issue for 2hrs
>>5.
>>
>>Alice upgraded the subscription to Premium at 10.30 AM
>>6.
>>
>>Watched video event comes for Alice at 10.40 AM
>>-
>>
>>   flink pipeline looks up in internal state and writes to enrichment
>>   topic
>>   -
>>
>>   Enrichment topic now contains Alice -> FREE
>>   7.
>>
>>Reference data starts flowing in at 11AM
>>-
>>
>>   let's assume we consider late elements upto 2 hours, so the click
>>   stream event of Alice is still buffered in the state
>>   - The enrichment topic will now contain duplicate records for
>>   Alice because of multiple firings of window
>>1. Alice -> FREE -> 10 AM
>>       2. Alice -> PREMIUM -> 11 AM
>>
>> Question is how do I avoid sending duplicate records ? I am not able to
>> understand it. I can think of Low Level joins but not sure how do we know
>> if it is stale data or not based on timestamp (watermark) as it can happen
>> that a particular enriched record is not updated for 6 hrs.
>>
>> Regards,
>> Vinay Patil
>>
>
>
> --
>
> Konstantin Knauf
>


Handling stale data enrichment

2020-04-23 Thread Vinay Patil
Hi,

I went through Konstantin webinar on 99 ways you can do enrichment. One
thing I am failing to understand is how do we efficiently handle stale data
enrichment.

Context: Let's say I want to enrich user data with the subscription data.
Here subscription data is acting as reference data and will be used for
joining these two streams based on event time. Consider the following
scenario:


   1. We are going to enrich Click Stream event (containing user_info) with
   Subscription details
   2. Subscription Status for Alice user is FREE
   3. Current Internal State contains Alice with Subscription status as FREE
   4.

   Reference data is not flowing because of some issue for 2hrs
   5.

   Alice upgraded the subscription to Premium at 10.30 AM
   6.

   Watched video event comes for Alice at 10.40 AM
   -

  flink pipeline looks up in internal state and writes to enrichment
  topic
  -

  Enrichment topic now contains Alice -> FREE
  7.

   Reference data starts flowing in at 11AM
   -

  let's assume we consider late elements upto 2 hours, so the click
  stream event of Alice is still buffered in the state
  - The enrichment topic will now contain duplicate records for Alice
  because of multiple firings of window
   1. Alice -> FREE -> 10 AM
  2. Alice -> PREMIUM -> 11 AM

Question is how do I avoid sending duplicate records ? I am not able to
understand it. I can think of Low Level joins but not sure how do we know
if it is stale data or not based on timestamp (watermark) as it can happen
that a particular enriched record is not updated for 6 hrs.

Regards,
Vinay Patil


Streaming File Sink - Parquet File Writer

2019-10-29 Thread Vinay Patil
Hi,

I am not able to roll the files based on file size as the bulkFormat has
onCheckpointRollingPolicy.

One way is to write CustomStreamingFileSink and provide RollingPolicy like
RowFormatBuilder. Is this the correct way to go ahead ?

Another way is to write ParquetEncoder and use RowFormatBuilder.

P.S. Curious to know Why was the RollingPolicy not exposed in case of
BulkFormat ?

Regards,
Vinay Patil


Re: Using STSAssumeRoleSessionCredentialsProvider for cross account access

2019-10-29 Thread Vinay Patil
Thanks Fabian,

@Gordon - Can you please help here.

Regards,
Vinay Patil


On Fri, Oct 25, 2019 at 9:11 PM Fabian Hueske  wrote:

> Hi Vinay,
>
> Maybe Gordon (in CC) has an idea about this issue.
>
> Best, Fabian
>
> Am Do., 24. Okt. 2019 um 14:50 Uhr schrieb Vinay Patil <
> vinay18.pa...@gmail.com>:
>
>> Hi,
>>
>> Can someone pls help here , facing issues in Prod . I see the following
>> ticket in unresolved state.
>>
>> https://issues.apache.org/jira/browse/FLINK-8417
>>
>>
>> Regards,
>> Vinay Patil
>>
>>
>> On Thu, Oct 24, 2019 at 11:01 AM Vinay Patil 
>> wrote:
>>
>>> Hi,
>>>
>>> I am trying to access dynamo streams from a different aws account but
>>> getting resource not found exception while trying to access the dynamo
>>> streams from Task Manager. I have provided the following configurations :
>>>
>>> *dynamodbStreamsConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ROLE_CREDENTIALS_PROVIDER,AWSConfigConstants.CredentialProvider.ASSUME_ROLE.name
>>> <http://AWSConfigConstants.CredentialProvider.ASSUME_ROLE.name>());*
>>>
>>>
>>> *dynamodbStreamsConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ROLE_ARN,dynamoDbConnect.getRoleArn());*
>>>
>>>
>>> *dynamodbStreamsConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ROLE_SESSION_NAME,dynamoDbConnect.getRoleSessionName());*
>>>
>>> In the main class I am able to get the arn of dynamoDb table
>>> using STSAssumeRoleSessionCredentialsProvider, so the assume role is
>>> working fine . Getting error only while accessing from TM.
>>>
>>> I assume that the credentials are not required to be passed :
>>> https://github.com/apache/flink/blob/abbd6b02d743486f3c0c1336139dd6b3edd20840/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java#L164
>>>
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>


DynamoStreams Consumer millisBehindLatest metric

2019-10-28 Thread Vinay Patil
Hi,

I am currently using FlinkDynamoStreamsConsumer in Production, for
monitoring the lag I am relying on millisBehindLatest metric but this
always returns -1 even if the dynamo stream contains million records
upfront.

Also, it would be great if we can add a documentation mentioning that Flink
supports DynamoStreams

Regards,
Vinay Patil


Re: Using STSAssumeRoleSessionCredentialsProvider for cross account access

2019-10-24 Thread Vinay Patil
Hi,

Can someone pls help here , facing issues in Prod . I see the following
ticket in unresolved state.

https://issues.apache.org/jira/browse/FLINK-8417


Regards,
Vinay Patil


On Thu, Oct 24, 2019 at 11:01 AM Vinay Patil 
wrote:

> Hi,
>
> I am trying to access dynamo streams from a different aws account but
> getting resource not found exception while trying to access the dynamo
> streams from Task Manager. I have provided the following configurations :
>
> *dynamodbStreamsConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ROLE_CREDENTIALS_PROVIDER,AWSConfigConstants.CredentialProvider.ASSUME_ROLE.name
> <http://AWSConfigConstants.CredentialProvider.ASSUME_ROLE.name>());*
>
>
> *dynamodbStreamsConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ROLE_ARN,dynamoDbConnect.getRoleArn());*
>
>
> *dynamodbStreamsConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ROLE_SESSION_NAME,dynamoDbConnect.getRoleSessionName());*
>
> In the main class I am able to get the arn of dynamoDb table
> using STSAssumeRoleSessionCredentialsProvider, so the assume role is
> working fine . Getting error only while accessing from TM.
>
> I assume that the credentials are not required to be passed :
> https://github.com/apache/flink/blob/abbd6b02d743486f3c0c1336139dd6b3edd20840/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java#L164
>
>
> Regards,
> Vinay Patil
>


Using STSAssumeRoleSessionCredentialsProvider for cross account access

2019-10-23 Thread Vinay Patil
Hi,

I am trying to access dynamo streams from a different aws account but
getting resource not found exception while trying to access the dynamo
streams from Task Manager. I have provided the following configurations :

*dynamodbStreamsConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ROLE_CREDENTIALS_PROVIDER,AWSConfigConstants.CredentialProvider.ASSUME_ROLE.name
<http://AWSConfigConstants.CredentialProvider.ASSUME_ROLE.name>());*

*dynamodbStreamsConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ROLE_ARN,dynamoDbConnect.getRoleArn());*

*dynamodbStreamsConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ROLE_SESSION_NAME,dynamoDbConnect.getRoleSessionName());*

In the main class I am able to get the arn of dynamoDb table
using STSAssumeRoleSessionCredentialsProvider, so the assume role is
working fine . Getting error only while accessing from TM.

I assume that the credentials are not required to be passed :
https://github.com/apache/flink/blob/abbd6b02d743486f3c0c1336139dd6b3edd20840/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java#L164


Regards,
Vinay Patil


Re: Consuming data from dynamoDB streams to flink

2019-08-08 Thread Vinay Patil
Hello,

For anyone looking for setting up alerts for flink application ,here is
good blog by Flink itself :
https://www.ververica.com/blog/monitoring-apache-flink-applications-101
So, for dynamoDb streams we can set the alert on millisBehindLatest

Regards,
Vinay Patil


On Wed, Aug 7, 2019 at 2:24 PM Vinay Patil  wrote:

> Hi Andrey,
>
> Thank you for your reply, I understand that the checkpoints are gone when
> the job is cancelled or killed, may be configuring external checkpoints
> will help here so that we can resume from there.
>
> My points was if the job is terminated, and the stream position is set to
> TRIM_HORIZON , the consumer will start processing from the start, I was
> curious to know if there is a configuration like kafka_group_id that we can
> set for dynamoDB streams.
>
> Also, can you please let me know on which metrics should I generate an
> alert in case of DynamoDb Streams  (I am sending the metrics to
> prometheus), I see these metrics :
> https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2cb1c45775c9/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java
>
>
> For Example: in case of Kafka we generate  alert when the consumer is
> lagging behind.
>
>
> Regards,
> Vinay Patil
>
>
> On Fri, Jul 19, 2019 at 10:40 PM Andrey Zagrebin 
> wrote:
>
>> Hi Vinay,
>>
>> 1. I would assume it works similar to kinesis connector (correct me if
>> wrong, people who actually developed it)
>> 2. If you have activated just checkpointing, the checkpoints are gone if
>> you externally kill the job. You might be interested in savepoints [1]
>> 3. See paragraph in [2] about kinesis consumer parallelism
>>
>> Best,
>> Andrey
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/savepoints.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kinesis.html#kinesis-consumer
>>
>> On Fri, Jul 19, 2019 at 1:45 PM Vinay Patil 
>> wrote:
>>
>>> Hi,
>>>
>>> I am using this consumer for processing records from DynamoDb Streams ,
>>> few questions on this :
>>>
>>> 1. How does checkpointing works with Dstreams, since this class is
>>> extending FlinkKinesisConsumer, I am assuming it will start from the last
>>> successful checkpoint in case of failure, right ?
>>> 2. Currently when I kill the pipeline and start again it reads all the
>>> data from the start of the stream, is there any configuration to avoid this
>>> (apart from ConsumerConfigConstants.InitialPosition.LATEST), similar to
>>> group-id in Kafka.
>>> 3. As these DynamoDB Streams are separated by shards what is the
>>> recommended parallelism to be set for the source , should it be one to one
>>> mapping , for example if there are 3 shards , then parallelism should be 3 ?
>>>
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>>
>>> On Wed, Aug 1, 2018 at 3:42 PM Ying Xu [via Apache Flink Mailing List
>>> archive.]  wrote:
>>>
>>>> Thank you so much Fabian!
>>>>
>>>> Will update status in the JIRA.
>>>>
>>>> -
>>>> Ying
>>>>
>>>> On Tue, Jul 31, 2018 at 1:37 AM, Fabian Hueske <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node=23597=0>> wrote:
>>>>
>>>> > Done!
>>>> >
>>>> > Thank you :-)
>>>> >
>>>> > 2018-07-31 6:41 GMT+02:00 Ying Xu <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node=23597=1>>:
>>>> >
>>>> > > Thanks Fabian and Thomas.
>>>> > >
>>>> > > Please assign FLINK-4582 to the following username:
>>>> > > *yxu-apache
>>>> > > <
>>>> https://issues.apache.org/jira/secure/ViewProfile.jspa?name=yxu-apache
>>>> > >*
>>>> > >
>>>> > > If needed I can get a ICLA or CCLA whichever is proper.
>>>> > >
>>>> > > *Ying Xu*
>>>> > > Software Engineer
>>>> > > 510.368.1252 <+15103681252>
>>>> > > [image: Lyft] <http://www.lyft.com/>
>>>> > >
>>>> > > On Mon, Jul 30, 2018 at 8:31 PM, Thomas Weise <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node=23597=2>> wrote:
>>>> > >
>>>> > >

Re: Consuming data from dynamoDB streams to flink

2019-08-07 Thread Vinay Patil
Hi Andrey,

Thank you for your reply, I understand that the checkpoints are gone when
the job is cancelled or killed, may be configuring external checkpoints
will help here so that we can resume from there.

My points was if the job is terminated, and the stream position is set to
TRIM_HORIZON , the consumer will start processing from the start, I was
curious to know if there is a configuration like kafka_group_id that we can
set for dynamoDB streams.

Also, can you please let me know on which metrics should I generate an
alert in case of DynamoDb Streams  (I am sending the metrics to
prometheus), I see these metrics :
https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2cb1c45775c9/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java


For Example: in case of Kafka we generate  alert when the consumer is
lagging behind.


Regards,
Vinay Patil


On Fri, Jul 19, 2019 at 10:40 PM Andrey Zagrebin 
wrote:

> Hi Vinay,
>
> 1. I would assume it works similar to kinesis connector (correct me if
> wrong, people who actually developed it)
> 2. If you have activated just checkpointing, the checkpoints are gone if
> you externally kill the job. You might be interested in savepoints [1]
> 3. See paragraph in [2] about kinesis consumer parallelism
>
> Best,
> Andrey
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/savepoints.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kinesis.html#kinesis-consumer
>
> On Fri, Jul 19, 2019 at 1:45 PM Vinay Patil 
> wrote:
>
>> Hi,
>>
>> I am using this consumer for processing records from DynamoDb Streams ,
>> few questions on this :
>>
>> 1. How does checkpointing works with Dstreams, since this class is
>> extending FlinkKinesisConsumer, I am assuming it will start from the last
>> successful checkpoint in case of failure, right ?
>> 2. Currently when I kill the pipeline and start again it reads all the
>> data from the start of the stream, is there any configuration to avoid this
>> (apart from ConsumerConfigConstants.InitialPosition.LATEST), similar to
>> group-id in Kafka.
>> 3. As these DynamoDB Streams are separated by shards what is the
>> recommended parallelism to be set for the source , should it be one to one
>> mapping , for example if there are 3 shards , then parallelism should be 3 ?
>>
>>
>> Regards,
>> Vinay Patil
>>
>>
>> On Wed, Aug 1, 2018 at 3:42 PM Ying Xu [via Apache Flink Mailing List
>> archive.]  wrote:
>>
>>> Thank you so much Fabian!
>>>
>>> Will update status in the JIRA.
>>>
>>> -
>>> Ying
>>>
>>> On Tue, Jul 31, 2018 at 1:37 AM, Fabian Hueske <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node=23597=0>> wrote:
>>>
>>> > Done!
>>> >
>>> > Thank you :-)
>>> >
>>> > 2018-07-31 6:41 GMT+02:00 Ying Xu <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node=23597=1>>:
>>> >
>>> > > Thanks Fabian and Thomas.
>>> > >
>>> > > Please assign FLINK-4582 to the following username:
>>> > > *yxu-apache
>>> > > <
>>> https://issues.apache.org/jira/secure/ViewProfile.jspa?name=yxu-apache
>>> > >*
>>> > >
>>> > > If needed I can get a ICLA or CCLA whichever is proper.
>>> > >
>>> > > *Ying Xu*
>>> > > Software Engineer
>>> > > 510.368.1252 <+15103681252>
>>> > > [image: Lyft] <http://www.lyft.com/>
>>> > >
>>> > > On Mon, Jul 30, 2018 at 8:31 PM, Thomas Weise <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node=23597=2>> wrote:
>>> > >
>>> > > > The user is yxu-lyft, Ying had commented on that JIRA as well.
>>> > > >
>>> > > > https://issues.apache.org/jira/browse/FLINK-4582
>>> > > >
>>> > > >
>>> > > > On Mon, Jul 30, 2018 at 1:25 AM Fabian Hueske <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node=23597=3>>
>>> > wrote:
>>> > > >
>>> > > > > Hi Ying,
>>> > > > >
>>> > > > > Thanks for considering to contribute the connector!
>>> > > > >
>>> > > > > In general, you don't need special permissions to contribute to
>>> > Flink.
>>> 

Re: StackOverflow Error

2019-07-21 Thread Vinay Patil
Hi Ravi,

The uber jar was correct, adding ClosureCleanerLevel to TOP_LEVEL resolved
this issue. Thanks a lot.

Is there any disadvantage of explicitly setting this ?


Regards,
Vinay Patil


On Sat, Jul 20, 2019 at 10:23 PM Ravi Bhushan Ratnakar <
ravibhushanratna...@gmail.com> wrote:

> Hi Vinay,
>
> ObjectNode seems ok as this is being used by flink provided
> "JsonNodeDeserailizationSchema".
>
> Please verify that you are using maven dependency
> "flink-connector-kinesis" 1.8.1 version (with your Flink 1.8.1 cluster) and
> package this dependency as part of your application uber/fat jar. If you
> are already doing this way then, please also try to set closure cleaner
> level to "TOP_LEVEL" like below.
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.getConfig.setClosureCleanerLevel(ExecutionConfig.ClosureCleanerLevel.TOP_LEVEL)
>
>
> Regards,
> Ravi
>
> On Sat, Jul 20, 2019 at 1:53 PM Vinay Patil 
> wrote:
>
>> Hi Ravi,
>>
>> Tried with both new and legacy mode, it works locally but on cluster I am
>> getting this exception, I am passing jackson ObjectNode class, should be
>> serializable. What do you think?
>>
>> On Sat, 20 Jul 2019, 12:11 Ravi Bhushan Ratnakar, <
>> ravibhushanratna...@gmail.com> wrote:
>>
>>> Hi Vinay,
>>>
>>> Please make sure that all your custom code is serializable. You can run
>>> this using new mode.
>>>
>>> Thanks,
>>> Ravi
>>>
>>> On Sat 20 Jul, 2019, 08:13 Vinay Patil,  wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to run a pipeline on Flink 1.8.1 ,getting the following
>>>> exception:
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *java.lang.StackOverflowError at
>>>> java.lang.Exception.(Exception.java:66) at
>>>> java.lang.ReflectiveOperationException.(ReflectiveOperationException.java:56)
>>>> at java.lang.NoSuchMethodException.(NoSuchMethodException.java:51) at
>>>> java.lang.Class.getDeclaredMethod(Class.java:2130) at
>>>> org.apache.flink.api.java.ClosureCleaner.usesCustomSerialization(ClosureCleaner.java:153)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:78)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)*
>>>>
>>>> I have even tried running in legacy mode, the pipeline code is :
>>>>
>>>> private void execute(String[] args) {
>>>> ParameterTool pt = ParameterTool.fromArgs(args);
>>>>
>>>> StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>> //env.setMaxParallelism(30);
>>>> env.setParallelism(20);
>>>>
>>>> env.enableCheckpointing(5000);
>>>> StateBackend backend = new
>>>> FsStateBackend(pt.getRequired("checkpoint_path"), true);
>>>> env.setStateBackend(backend);
>>>>
>>>> FlinkDynamoDBStreamsConsumer
>>>> flinkDynamoDBStreamsConsumer =
>>>> new
>>>> FlinkDynamoDBStreamsConsumer<>(DYNAMODB_STREAM_NAME, new
>>>> JsonNodeDeserializationSchema(),
>>>> dynamodbStreamsConsumerConfig);
>>>>
>>>> SingleOutputStreamOperator sourceStream = env
>>>> .addSource(flinkDynamoDBStreamsConsumer)
>>>> .name("Dynamo DB Streams");
>>>>
>>>> sourceStream
>>>> .keyBy(new CdcKeySelector())
>>>> .addSink(new
>>>> FlinkKafkaProducer<>("dev-broker.hotstar.npe:9092", "ums-dynamo-streams",
>>>> new JsonSerializationSchema()))
>>>> .name("Kafka Sink");
>>>>
>>>> try {
>>>> env.execute();
>>>> } catch (Exception e) {
>>>> System.out.println("Caught exception for pipeline" +
>>>> e.getMessage());
>>>> e.printStackTrace();
>>>> }
>>>> }
>>>>
>>>> Regards,
>>>> Vinay Patil
>>>>
>>>


Re: StackOverflow Error

2019-07-20 Thread Vinay Patil
Hi Ravi,

Tried with both new and legacy mode, it works locally but on cluster I am
getting this exception, I am passing jackson ObjectNode class, should be
serializable. What do you think?

On Sat, 20 Jul 2019, 12:11 Ravi Bhushan Ratnakar, <
ravibhushanratna...@gmail.com> wrote:

> Hi Vinay,
>
> Please make sure that all your custom code is serializable. You can run
> this using new mode.
>
> Thanks,
> Ravi
>
> On Sat 20 Jul, 2019, 08:13 Vinay Patil,  wrote:
>
>> Hi,
>>
>> I am trying to run a pipeline on Flink 1.8.1 ,getting the following
>> exception:
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *java.lang.StackOverflowError at
>> java.lang.Exception.(Exception.java:66) at
>> java.lang.ReflectiveOperationException.(ReflectiveOperationException.java:56)
>> at java.lang.NoSuchMethodException.(NoSuchMethodException.java:51) at
>> java.lang.Class.getDeclaredMethod(Class.java:2130) at
>> org.apache.flink.api.java.ClosureCleaner.usesCustomSerialization(ClosureCleaner.java:153)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:78)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)*
>>
>> I have even tried running in legacy mode, the pipeline code is :
>>
>> private void execute(String[] args) {
>> ParameterTool pt = ParameterTool.fromArgs(args);
>>
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> //env.setMaxParallelism(30);
>> env.setParallelism(20);
>>
>> env.enableCheckpointing(5000);
>> StateBackend backend = new
>> FsStateBackend(pt.getRequired("checkpoint_path"), true);
>> env.setStateBackend(backend);
>>
>> FlinkDynamoDBStreamsConsumer
>> flinkDynamoDBStreamsConsumer =
>> new FlinkDynamoDBStreamsConsumer<>(DYNAMODB_STREAM_NAME,
>> new JsonNodeDeserializationSchema(),
>> dynamodbStreamsConsumerConfig);
>>
>> SingleOutputStreamOperator sourceStream = env
>> .addSource(flinkDynamoDBStreamsConsumer)
>> .name("Dynamo DB Streams");
>>
>> sourceStream
>> .keyBy(new CdcKeySelector())
>> .addSink(new
>> FlinkKafkaProducer<>("dev-broker.hotstar.npe:9092", "ums-dynamo-streams",
>> new JsonSerializationSchema()))
>> .name("Kafka Sink");
>>
>> try {
>> env.execute();
>> } catch (Exception e) {
>> System.out.println("Caught exception for pipeline" +
>> e.getMessage());
>> e.printStackTrace();
>> }
>> }
>>
>> Regards,
>> Vinay Patil
>>
>


StackOverflow Error

2019-07-20 Thread Vinay Patil
Hi,

I am trying to run a pipeline on Flink 1.8.1 ,getting the following
exception:


















*java.lang.StackOverflowError at
java.lang.Exception.(Exception.java:66) at
java.lang.ReflectiveOperationException.(ReflectiveOperationException.java:56)
at java.lang.NoSuchMethodException.(NoSuchMethodException.java:51) at
java.lang.Class.getDeclaredMethod(Class.java:2130) at
org.apache.flink.api.java.ClosureCleaner.usesCustomSerialization(ClosureCleaner.java:153)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:78)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)*

I have even tried running in legacy mode, the pipeline code is :

private void execute(String[] args) {
ParameterTool pt = ParameterTool.fromArgs(args);

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
//env.setMaxParallelism(30);
env.setParallelism(20);

env.enableCheckpointing(5000);
StateBackend backend = new
FsStateBackend(pt.getRequired("checkpoint_path"), true);
env.setStateBackend(backend);

FlinkDynamoDBStreamsConsumer
flinkDynamoDBStreamsConsumer =
new FlinkDynamoDBStreamsConsumer<>(DYNAMODB_STREAM_NAME,
new JsonNodeDeserializationSchema(),
dynamodbStreamsConsumerConfig);

SingleOutputStreamOperator sourceStream = env
.addSource(flinkDynamoDBStreamsConsumer)
.name("Dynamo DB Streams");

sourceStream
.keyBy(new CdcKeySelector())
.addSink(new
FlinkKafkaProducer<>("dev-broker.hotstar.npe:9092", "ums-dynamo-streams",
new JsonSerializationSchema()))
.name("Kafka Sink");

try {
env.execute();
} catch (Exception e) {
System.out.println("Caught exception for pipeline" +
e.getMessage());
e.printStackTrace();
}
}

Regards,
Vinay Patil


Re: Consuming data from dynamoDB streams to flink

2019-07-19 Thread Vinay Patil
Hi,

I am using this consumer for processing records from DynamoDb Streams , few
questions on this :

1. How does checkpointing works with Dstreams, since this class is
extending FlinkKinesisConsumer, I am assuming it will start from the last
successful checkpoint in case of failure, right ?
2. Currently when I kill the pipeline and start again it reads all the data
from the start of the stream, is there any configuration to avoid this
(apart from ConsumerConfigConstants.InitialPosition.LATEST), similar to
group-id in Kafka.
3. As these DynamoDB Streams are separated by shards what is the
recommended parallelism to be set for the source , should it be one to one
mapping , for example if there are 3 shards , then parallelism should be 3 ?


Regards,
Vinay Patil


On Wed, Aug 1, 2018 at 3:42 PM Ying Xu [via Apache Flink Mailing List
archive.]  wrote:

> Thank you so much Fabian!
>
> Will update status in the JIRA.
>
> -
> Ying
>
> On Tue, Jul 31, 2018 at 1:37 AM, Fabian Hueske <[hidden email]
> <http:///user/SendEmail.jtp?type=node=23597=0>> wrote:
>
> > Done!
> >
> > Thank you :-)
> >
> > 2018-07-31 6:41 GMT+02:00 Ying Xu <[hidden email]
> <http:///user/SendEmail.jtp?type=node=23597=1>>:
> >
> > > Thanks Fabian and Thomas.
> > >
> > > Please assign FLINK-4582 to the following username:
> > > *yxu-apache
> > > <
> https://issues.apache.org/jira/secure/ViewProfile.jspa?name=yxu-apache
> > >*
> > >
> > > If needed I can get a ICLA or CCLA whichever is proper.
> > >
> > > *Ying Xu*
> > > Software Engineer
> > > 510.368.1252 <+15103681252>
> > > [image: Lyft] <http://www.lyft.com/>
> > >
> > > On Mon, Jul 30, 2018 at 8:31 PM, Thomas Weise <[hidden email]
> <http:///user/SendEmail.jtp?type=node=23597=2>> wrote:
> > >
> > > > The user is yxu-lyft, Ying had commented on that JIRA as well.
> > > >
> > > > https://issues.apache.org/jira/browse/FLINK-4582
> > > >
> > > >
> > > > On Mon, Jul 30, 2018 at 1:25 AM Fabian Hueske <[hidden email]
> <http:///user/SendEmail.jtp?type=node=23597=3>>
> > wrote:
> > > >
> > > > > Hi Ying,
> > > > >
> > > > > Thanks for considering to contribute the connector!
> > > > >
> > > > > In general, you don't need special permissions to contribute to
> > Flink.
> > > > > Anybody can open Jiras and PRs.
> > > > > You only need to be assigned to the Contributor role in Jira to be
> > able
> > > > to
> > > > > assign an issue to you.
> > > > > I can give you these permissions if you tell me your Jira user.
> > > > >
> > > > > It would also be good if you could submit a CLA [1] if you plan to
> > > > > contribute a larger feature.
> > > > >
> > > > > Thanks, Fabian
> > > > >
> > > > > [1] https://www.apache.org/licenses/#clas
> > > > >
> > > > >
> > > > > 2018-07-30 10:07 GMT+02:00 Ying Xu <[hidden email]
> <http:///user/SendEmail.jtp?type=node=23597=4>>:
> > > > >
> > > > > > Hello Flink dev:
> > > > > >
> > > > > > We have implemented the prototype design and the initial PoC
> worked
> > > > > pretty
> > > > > > well.  Currently, we plan to move ahead with this design in our
> > > > internal
> > > > > > production system.
> > > > > >
> > > > > > We are thinking of contributing this connector back to the flink
> > > > > community
> > > > > > sometime soon.  May I request to be granted with a contributor
> > role?
> > > > > >
> > > > > > Many thanks in advance.
> > > > > >
> > > > > > *Ying Xu*
> > > > > > Software Engineer
> > > > > > 510.368.1252 <+15103681252>
> > > > > > [image: Lyft] <http://www.lyft.com/>
> > > > > >
> > > > > > On Fri, Jul 6, 2018 at 6:23 PM, Ying Xu <[hidden email]
> <http:///user/SendEmail.jtp?type=node=23597=5>> wrote:
> > > > > >
> > > > > > > Hi Gordon:
> > > > > > >
> > > > > > > Cool. Thanks for the thumb-up!
> > > > > > >
> > > > > > > We will include some

Re: EXT :Re: StreamingFileSink cannot get AWS S3 credentials

2019-01-17 Thread Vinay Patil
Hi Stephan.,

Yes, we tried setting fs.s3a.aws.credentials.provider but we are getting
class not found exception for InstanceProfileCredentialsProvider because of
shading issue.


Regards,
Vinay Patil


On Thu, Jan 17, 2019 at 3:02 PM Stephan Ewen  wrote:

> Regarding configurations: According to the code [1]
> <https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java#L50>,
> all config keys starting with "s3", "s3a" and "fs.s3a" are forwarded from
> the flink-conf.yaml to the Hadoop file systems.
>
> Regarding profile-based authentication: Have you tried to set the
> credentials provider explicitly, by setting "
> fs.s3a.aws.credentials.provider" ?
>
> [1]
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java#L50
>
> On Thu, Jan 17, 2019 at 12:57 AM Martin, Nick  wrote:
>
>> Does that mean that the full set of fs.s3a.<…> configs in core-site.xml
>> will be picked up from flink-conf.yaml by flink? Or only certain configs
>> involved with authentication?
>>
>>
>>
>> *From:* Till Rohrmann [mailto:trohrm...@apache.org]
>> *Sent:* Wednesday, January 16, 2019 3:43 AM
>> *To:* Vinay Patil 
>> *Cc:* Kostas Kloudas ; Dawid Wysakowicz <
>> dwysakow...@apache.org>; Taher Koitawala [via Apache Flink User Mailing
>> List archive.] ; user <
>> user@flink.apache.org>
>> *Subject:* EXT :Re: StreamingFileSink cannot get AWS S3 credentials
>>
>>
>>
>> I haven't configured this myself but I would guess that you need to set
>> the parameters defined here under S3A Authentication methods [1]. If the
>> environment variables don't work, then I would try to set the
>> authentication properties.
>>
>>
>>
>> [1]
>> https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A
>>
>>
>>
>> Cheers,
>> Till
>>
>>
>>
>> On Wed, Jan 16, 2019 at 11:09 AM Vinay Patil 
>> wrote:
>>
>> Hi Till,
>>
>>
>>
>> Can you please let us know the configurations that we need to set for
>> Profile based credential provider in flink-conf.yaml
>>
>>
>>
>> Exporting AWS_PROFILE property on EMR did not work.
>>
>>
>>
>> Regards,
>>
>> Vinay Patil
>>
>>
>>
>>
>>
>> On Wed, Jan 16, 2019 at 3:05 PM Till Rohrmann 
>> wrote:
>>
>> The old BucketingSink was using Hadoop's S3 filesystem directly whereas
>> the new StreamingFileSink uses Flink's own FileSystem which need to be
>> configured via the flink-conf.yaml.
>>
>>
>>
>> Cheers,
>>
>> Till
>>
>>
>>
>> On Wed, Jan 16, 2019 at 10:31 AM Vinay Patil 
>> wrote:
>>
>> Hi Till,
>>
>>
>>
>> We are not providing `fs.s3a.access.key: access_key`, `fs.s3a.secret.key:
>> secret_key` in flink-conf.yaml as we are using Profile based credentials
>> provider. The older BucketingSink code is able to get the credentials and
>> write to S3. We are facing this issue only with StreamingFileSink. We tried
>> adding fs.s3a.impl to core-site.xml when the default configurations were
>> not working.
>>
>>
>>
>> Regards,
>>
>> Vinay Patil
>>
>>
>>
>>
>>
>> On Wed, Jan 16, 2019 at 2:55 PM Till Rohrmann 
>> wrote:
>>
>> Hi Vinay,
>>
>>
>>
>> Flink's file systems are self contained and won't respect the
>> core-site.xml if I'm not mistaken. Instead you have to set the credentials
>> in the flink configuration flink-conf.yaml via `fs.s3a.access.key:
>> access_key`, `fs.s3a.secret.key: secret_key` and so on [1]. Have you tried
>> this out?
>>
>>
>>
>> This has been fixed with Flink 1.6.2 and 1.7.0 [2].
>>
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems.html#built-in-file-systems
>>
>> [2] https://issues.apache.org/jira/browse/FLINK-10383
>>
>>
>>
>> Cheers,
>>
>> Till
>>
>>
>>
>> On Wed, Jan 16, 2019 at 10:10 AM Kostas Kloudas <
>> k.klou...@da-platform.com> wrote:
>>
>> Hi Taher,
>>
>>
>>
>> So you are using the same configuration files and everything and the only
>> thing you change is the "s3://" to "s3a://" and the sink cannot find the
>> cred

Re: StreamingFileSink cannot get AWS S3 credentials

2019-01-16 Thread Vinay Patil
Hi Till,

Can you please let us know the configurations that we need to set for
Profile based credential provider in flink-conf.yaml

Exporting AWS_PROFILE property on EMR did not work.

Regards,
Vinay Patil


On Wed, Jan 16, 2019 at 3:05 PM Till Rohrmann  wrote:

> The old BucketingSink was using Hadoop's S3 filesystem directly whereas
> the new StreamingFileSink uses Flink's own FileSystem which need to be
> configured via the flink-conf.yaml.
>
> Cheers,
> Till
>
> On Wed, Jan 16, 2019 at 10:31 AM Vinay Patil 
> wrote:
>
>> Hi Till,
>>
>> We are not providing `fs.s3a.access.key: access_key`, `fs.s3a.secret.key:
>> secret_key` in flink-conf.yaml as we are using Profile based credentials
>> provider. The older BucketingSink code is able to get the credentials and
>> write to S3. We are facing this issue only with StreamingFileSink. We tried
>> adding fs.s3a.impl to core-site.xml when the default configurations were
>> not working.
>>
>> Regards,
>> Vinay Patil
>>
>>
>> On Wed, Jan 16, 2019 at 2:55 PM Till Rohrmann 
>> wrote:
>>
>>> Hi Vinay,
>>>
>>> Flink's file systems are self contained and won't respect the
>>> core-site.xml if I'm not mistaken. Instead you have to set the credentials
>>> in the flink configuration flink-conf.yaml via `fs.s3a.access.key:
>>> access_key`, `fs.s3a.secret.key: secret_key` and so on [1]. Have you tried
>>> this out?
>>>
>>> This has been fixed with Flink 1.6.2 and 1.7.0 [2].
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems.html#built-in-file-systems
>>> [2] https://issues.apache.org/jira/browse/FLINK-10383
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Jan 16, 2019 at 10:10 AM Kostas Kloudas <
>>> k.klou...@da-platform.com> wrote:
>>>
>>>> Hi Taher,
>>>>
>>>> So you are using the same configuration files and everything and the
>>>> only thing you change is the "s3://" to "s3a://" and the sink cannot find
>>>> the credentials?
>>>> Could you please provide the logs of the Task Managers?
>>>>
>>>> Cheers,
>>>> Kostas
>>>>
>>>> On Wed, Jan 16, 2019 at 9:13 AM Dawid Wysakowicz <
>>>> dwysakow...@apache.org> wrote:
>>>>
>>>>> Forgot to cc ;)
>>>>> On 16/01/2019 08:51, Vinay Patil wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> Can someone please help on this issue. We have even tried to set
>>>>> fs.s3a.impl in core-site.xml, still its not working.
>>>>>
>>>>> Regards,
>>>>> Vinay Patil
>>>>>
>>>>>
>>>>> On Fri, Jan 11, 2019 at 5:03 PM Taher Koitawala [via Apache Flink User
>>>>> Mailing List archive.]  wrote:
>>>>>
>>>>>> Hi All,
>>>>>>  We have implemented S3 sink in the following way:
>>>>>>
>>>>>> StreamingFileSink sink= StreamingFileSink.forBulkFormat(new
>>>>>> Path("s3a://mybucket/myfolder/output/"),
>>>>>> ParquetAvroWriters.forGenericRecord(schema))
>>>>>> .withBucketCheckInterval(50l).withBucketAssigner(new
>>>>>> CustomBucketAssigner()).build();
>>>>>>
>>>>>> The problem we are facing is that StreamingFileSink is initializing
>>>>>> S3AFileSystem class to write to s3 and is not able to find the s3
>>>>>> credentials to write data, However other flink application on the
>>>>>> same cluster use "s3://" paths are able to write data to the same s3 
>>>>>> bucket
>>>>>> and folders, we are only facing this issue with StreamingFileSink.
>>>>>>
>>>>>> Regards,
>>>>>> Taher Koitawala
>>>>>> GS Lab Pune
>>>>>> +91 8407979163
>>>>>>
>>>>>>
>>>>>> --
>>>>>> If you reply to this email, your message will be added to the
>>>>>> discussion below:
>>>>>>
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/StreamingFileSink-cannot-get-AWS-S3-credentials-tp25464.html
>>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>>> email ml+s2336050n1...@n4.nabble.com
>>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>>> here
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
>>>>>> .
>>>>>> NAML
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>>
>>>>>


Re: StreamingFileSink cannot get AWS S3 credentials

2019-01-16 Thread Vinay Patil
Hi Till,

We are not providing `fs.s3a.access.key: access_key`, `fs.s3a.secret.key:
secret_key` in flink-conf.yaml as we are using Profile based credentials
provider. The older BucketingSink code is able to get the credentials and
write to S3. We are facing this issue only with StreamingFileSink. We tried
adding fs.s3a.impl to core-site.xml when the default configurations were
not working.

Regards,
Vinay Patil


On Wed, Jan 16, 2019 at 2:55 PM Till Rohrmann  wrote:

> Hi Vinay,
>
> Flink's file systems are self contained and won't respect the
> core-site.xml if I'm not mistaken. Instead you have to set the credentials
> in the flink configuration flink-conf.yaml via `fs.s3a.access.key:
> access_key`, `fs.s3a.secret.key: secret_key` and so on [1]. Have you tried
> this out?
>
> This has been fixed with Flink 1.6.2 and 1.7.0 [2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems.html#built-in-file-systems
> [2] https://issues.apache.org/jira/browse/FLINK-10383
>
> Cheers,
> Till
>
> On Wed, Jan 16, 2019 at 10:10 AM Kostas Kloudas 
> wrote:
>
>> Hi Taher,
>>
>> So you are using the same configuration files and everything and the only
>> thing you change is the "s3://" to "s3a://" and the sink cannot find the
>> credentials?
>> Could you please provide the logs of the Task Managers?
>>
>> Cheers,
>> Kostas
>>
>> On Wed, Jan 16, 2019 at 9:13 AM Dawid Wysakowicz 
>> wrote:
>>
>>> Forgot to cc ;)
>>> On 16/01/2019 08:51, Vinay Patil wrote:
>>>
>>> Hi,
>>>
>>> Can someone please help on this issue. We have even tried to set
>>> fs.s3a.impl in core-site.xml, still its not working.
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>>
>>> On Fri, Jan 11, 2019 at 5:03 PM Taher Koitawala [via Apache Flink User
>>> Mailing List archive.]  wrote:
>>>
>>>> Hi All,
>>>>  We have implemented S3 sink in the following way:
>>>>
>>>> StreamingFileSink sink= StreamingFileSink.forBulkFormat(new
>>>> Path("s3a://mybucket/myfolder/output/"),
>>>> ParquetAvroWriters.forGenericRecord(schema))
>>>> .withBucketCheckInterval(50l).withBucketAssigner(new
>>>> CustomBucketAssigner()).build();
>>>>
>>>> The problem we are facing is that StreamingFileSink is initializing
>>>> S3AFileSystem class to write to s3 and is not able to find the s3
>>>> credentials to write data, However other flink application on the same
>>>> cluster use "s3://" paths are able to write data to the same s3 bucket and
>>>> folders, we are only facing this issue with StreamingFileSink.
>>>>
>>>> Regards,
>>>> Taher Koitawala
>>>> GS Lab Pune
>>>> +91 8407979163
>>>>
>>>>
>>>> --
>>>> If you reply to this email, your message will be added to the
>>>> discussion below:
>>>>
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/StreamingFileSink-cannot-get-AWS-S3-credentials-tp25464.html
>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>> email ml+s2336050n1...@n4.nabble.com
>>>> To unsubscribe from Apache Flink User Mailing List archive., click here
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
>>>> .
>>>> NAML
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>
>>>


Re: StreamingFileSink cannot get AWS S3 credentials

2019-01-15 Thread Vinay Patil
Hi,

Can someone please help on this issue. We have even tried to set
fs.s3a.impl in core-site.xml, still its not working.

Regards,
Vinay Patil


On Fri, Jan 11, 2019 at 5:03 PM Taher Koitawala [via Apache Flink User
Mailing List archive.]  wrote:

> Hi All,
>  We have implemented S3 sink in the following way:
>
> StreamingFileSink sink= StreamingFileSink.forBulkFormat(new
> Path("s3a://mybucket/myfolder/output/"),
> ParquetAvroWriters.forGenericRecord(schema))
> .withBucketCheckInterval(50l).withBucketAssigner(new
> CustomBucketAssigner()).build();
>
> The problem we are facing is that StreamingFileSink is initializing
> S3AFileSystem class to write to s3 and is not able to find the s3
> credentials to write data, However other flink application on the same
> cluster use "s3://" paths are able to write data to the same s3 bucket and
> folders, we are only facing this issue with StreamingFileSink.
>
> Regards,
> Taher Koitawala
> GS Lab Pune
> +91 8407979163
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/StreamingFileSink-cannot-get-AWS-S3-credentials-tp25464.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>


Re: AvroInputFormat Serialisation Issue

2018-11-09 Thread Vinay Patil
Hi,

Changing the classloader config to parent-first solved the issue.

Regards,
Vinay Patil


On Wed, Nov 7, 2018 at 7:25 AM Vinay Patil  wrote:

> Hi,
>
> Can someone please help here.
>
> On Nov 6, 2018 10:46 PM, "Vinay Patil [via Apache Flink User Mailing List
> archive.]"  wrote:
>
>> Hi,
>>
>> I am facing a similar issue today with Flink 1.6.0 - AvroOutputFormat
>>
>> AvroOutputFormat tuple2AvroOutputFormat = new
>> AvroOutputFormat<>(
>> new Path(""), GenericRecord.class);
>>
>> testDataSet
>> .map(new GenerateGenericRecord())
>> .returns(AvroTypeInfo.of(GenericRecord.class))
>> .output(tuple2AvroOutputFormat);
>>
>> Following is the exception (I have enabled forceAvro config , not sure
>> why
>> it still goes to Kyro Serializer)
>>
>> com.esotericsoftware.kryo.KryoException: Error constructing instance of
>> class: org.apache.avro.Schema$LockableArrayList
>> Serialization trace:
>> types (org.apache.avro.Schema$UnionSchema)
>> schema (org.apache.avro.Schema$Field)
>> fieldMap (org.apache.avro.Schema$RecordSchema)
>> schema (org.apache.avro.generic.GenericData$Record)
>> at
>> com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:126)
>> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
>> at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.create(CollectionSerializer.java:89)
>>
>> at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:93)
>>
>> at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>>
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>
>> Please let me know if there is a fix for this issue as I have not faced
>> this
>> problem for DataStreams.
>>
>> Regards,
>> Vinay Patil
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/AvroInputFormat-Serialisation-Issue-tp20146p24314.html
>> To start a new topic under Apache Flink User Mailing List archive., email
>> ml+s2336050n1...@n4.nabble.com
>> To unsubscribe from Apache Flink User Mailing List archive., click here
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
>> .
>> NAML
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>


Re: AvroInputFormat Serialisation Issue

2018-11-06 Thread Vinay Patil
Hi,

I am facing a similar issue today with Flink 1.6.0 - AvroOutputFormat

AvroOutputFormat tuple2AvroOutputFormat = new
AvroOutputFormat<>(
new Path(""), GenericRecord.class);

testDataSet
.map(new GenerateGenericRecord())
.returns(AvroTypeInfo.of(GenericRecord.class))
.output(tuple2AvroOutputFormat);

Following is the exception (I have enabled forceAvro config , not sure why
it still goes to Kyro Serializer)

com.esotericsoftware.kryo.KryoException: Error constructing instance of
class: org.apache.avro.Schema$LockableArrayList
Serialization trace:
types (org.apache.avro.Schema$UnionSchema)
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
at 
com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:126)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.create(CollectionSerializer.java:89)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:93)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)

Please let me know if there is a fix for this issue as I have not faced this
problem for DataStreams.

Regards,
Vinay Patil




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Unable to start session cluster using Docker

2018-10-06 Thread Vinay Patil
Thank you Till, I am able to start the session-cluster now.

Regards,
Vinay Patil


On Fri, Oct 5, 2018 at 8:15 PM Till Rohrmann  wrote:

> Hi Vinay,
>
> are you referring to flink-contrib/docker-flink/docker-compose.yml? We
> recently fixed the command line parsing with Flink 1.5.4 and 1.6.1. Due to
> this, the removal of the second command line parameter intended to be
> introduced with 1.5.0 and 1.6.0 (see
> https://issues.apache.org/jira/browse/FLINK-8696) became visible. The
> docker-compose.yml file has not yet been updated. I will do this right away
> and push the changes to the 1.5, 1.6 and master branch. Sorry for the
> inconveniences. As a local fix for you, please go to
> flink-contrib/docker-flink/docker-entrypoint.sh:33 and remove the cluster
> parameter from this line.
>
> Cheers,
> Till
>
> On Thu, Oct 4, 2018 at 8:30 PM Vinay Patil 
> wrote:
>
>> Hi,
>>
>> I have used the docker-compose file for creating the cluster as shown in
>> the documentation. The web ui is started successfully, however, the task
>> managers are unable to join.
>>
>> Job Manager container logs:
>>
>> 018-10-04 18:13:13,907 INFO
>> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Rest
>> endpoint listening at cluster:8081
>>
>> 2018-10-04 18:13:13,907 INFO
>> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint-
>> http://cluster:8081 was granted leadership with
>> leaderSessionID=----
>>
>> 2018-10-04 18:13:13,907 INFO
>> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Web
>> frontend listening at http://cluster:8081
>>
>> 2018-10-04 18:13:14,012 INFO
>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>> ResourceManager akka.tcp://flink@cluster:6123/user/resourcemanager was
>> granted leadership with fencing token 
>>
>> 2018-10-04 18:13:14,013 INFO
>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  -
>> Starting the SlotManager.
>>
>> 2018-10-04 18:13:14,026 INFO
>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Dispatcher
>> akka.tcp://flink@cluster:6123/user/dispatcher was granted leadership
>> with fencing token ----
>>
>> Not sure why it says Web Frontend listening at cluster:8081 when the job
>> manager rpc address is specified to jobmanager
>>
>> Task Manager Container Logs:
>>
>> 018-10-04 18:19:18,818 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor- Connecting
>> to ResourceManager akka.tcp://flink@jobmanager
>> :6123/user/resourcemanager().
>>
>> 2018-10-04 18:19:18,818 INFO
>> org.apache.flink.runtime.filecache.FileCache  - User file
>> cache uses directory
>> /tmp/flink-dist-cache-1bd95c51-3031-42ab-b782-14a0023921e5
>>
>> 2018-10-04 18:19:28,850 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
>> resolve ResourceManager address 
>> akka.tcp://flink@jobmanager:6123/user/resourcemanager,
>> retrying in 1 ms: Ask timed out on
>> [ActorSelection[Anchor(akka.tcp://flink@jobmanager:6123/),
>> Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message
>> of type "akka.actor.Identify".
>>
>>
>> I have even tried to set JOB_MANAGER_RPC_ADDRESS=cluster in   in
>> docker-compose file, it does not work.
>> Even "cluster" and "jobmanager" points to localhost in /etc/hosts file.
>>
>> Can you please let me know what is the issue here.
>>
>> Regards,
>> Vinay Patil
>>
>


Unable to start session cluster using Docker

2018-10-04 Thread Vinay Patil
Hi,

I have used the docker-compose file for creating the cluster as shown in
the documentation. The web ui is started successfully, however, the task
managers are unable to join.

Job Manager container logs:

018-10-04 18:13:13,907 INFO
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Rest
endpoint listening at cluster:8081

2018-10-04 18:13:13,907 INFO
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint-
http://cluster:8081 was granted leadership with
leaderSessionID=----

2018-10-04 18:13:13,907 INFO
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Web
frontend listening at http://cluster:8081

2018-10-04 18:13:14,012 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
ResourceManager akka.tcp://flink@cluster:6123/user/resourcemanager was
granted leadership with fencing token 

2018-10-04 18:13:14,013 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  -
Starting the SlotManager.

2018-10-04 18:13:14,026 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Dispatcher
akka.tcp://flink@cluster:6123/user/dispatcher was granted leadership with
fencing token ----

Not sure why it says Web Frontend listening at cluster:8081 when the job
manager rpc address is specified to jobmanager

Task Manager Container Logs:

018-10-04 18:19:18,818 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor- Connecting
to ResourceManager akka.tcp://flink@jobmanager
:6123/user/resourcemanager().

2018-10-04 18:19:18,818 INFO  org.apache.flink.runtime.filecache.FileCache
- User file cache uses directory
/tmp/flink-dist-cache-1bd95c51-3031-42ab-b782-14a0023921e5

2018-10-04 18:19:28,850 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
resolve ResourceManager address
akka.tcp://flink@jobmanager:6123/user/resourcemanager,
retrying in 1 ms: Ask timed out on
[ActorSelection[Anchor(akka.tcp://flink@jobmanager:6123/),
Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message
of type "akka.actor.Identify".


I have even tried to set JOB_MANAGER_RPC_ADDRESS=cluster in   in
docker-compose file, it does not work.
Even "cluster" and "jobmanager" points to localhost in /etc/hosts file.

Can you please let me know what is the issue here.

Regards,
Vinay Patil


Re: Checkpointing not happening in Standalone HA mode

2018-07-27 Thread Vinay Patil
Hi Vino,

Yes, Job runs successfully, however, no checkpoints are successful. I will
update the source

Regards,
Vinay Patil


On Fri, Jul 27, 2018 at 2:00 PM vino yang  wrote:

> Hi Vinay,
>
> Oh!  You use a collection source? That's the problem. Please use a general
> source like Kafka or others. Maybe your checkpoint has not be triggered,
> your job has stopped.
>
> Thanks, vino.
>
> 2018-07-27 16:07 GMT+08:00 Vinay Patil :
>
>> Hi Vino,
>>
>> Yes I am enabling checkpoint in the code as follows :
>>
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.createRemoteEnvironment(",,getJobConfiguration(),jarPath");
>>
>>
>> env.enableCheckpointing(1000);
>>
>> env.setSateBackend(new 
>> FsStateBackend("file:///"));
>>
>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
>>
>>
>> In getJobConfiguration method I have set HA related properties like
>> HA_STORAGE_PATH,HA_ZOOKEEPER_QUORUM,HA_ZOOKEEPER_ROOT,HA_MODE,HA_JOB_MANAGER_PORT_RANGE,HA_CLUSTER_ID
>>
>>
>> I can see the error in Job Manager logs where it says Collection Source
>> is not being executed at the moment. Aborting checkpoint. In the pipeline I
>> have a stream initialized using "fromCollection". I think I will have to
>> get rid of this.
>>
>> What do you suggest
>>
>> Regards,
>> Vinay Patil
>>
>>
>> On Thu, Jul 26, 2018 at 12:04 PM vino yang  wrote:
>>
>>> Hi Vinay:
>>>
>>> Did you call specific config API refer to this documentation[1];
>>>
>>> Can you share your job program and JM Log? Or the JM log contains the
>>> log message like this pattern "Triggering checkpoint {} @ {} for job {}."?
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing
>>>
>>> Thanks, vino.
>>>
>>> 2018-07-25 19:43 GMT+08:00 Chesnay Schepler :
>>>
>>>> Can you provide us with the job code?
>>>>
>>>> I assume that checkpointing runs properly if you submit the same job to
>>>> a normal cluster?
>>>>
>>>>
>>>> On 25.07.2018 13:15, Vinay Patil wrote:
>>>>
>>>> No error in the logs. That is why I am not able to understand why
>>>> checkpoints are not getting triggered.
>>>>
>>>> Regards,
>>>> Vinay Patil
>>>>
>>>>
>>>> On Wed, Jul 25, 2018 at 4:44 PM Vinay Patil 
>>>> wrote:
>>>>
>>>>> Hi Chesnay,
>>>>>
>>>>> No error in the logs. That is why I am not able to understand why
>>>>> checkpoints are getting triggered.
>>>>>
>>>>> Regards,
>>>>> Vinay Patil
>>>>>
>>>>>
>>>>> On Wed, Jul 25, 2018 at 4:36 PM Chesnay Schepler 
>>>>> wrote:
>>>>>
>>>>>> Please check the job- and taskmanager logs for anything suspicious.
>>>>>>
>>>>>> On 25.07.2018 12:33, Vinay Patil wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am starting the cluster using bootstrap application where in I am
>>>>>> calling Job Manager and Task Manager main class to form the cluster. The 
>>>>>> HA
>>>>>> cluster is formed correctly and I am able to submit jobs to this cluster
>>>>>> using RemoteExecutionEnvironment but when I enable checkpointing in code 
>>>>>> I
>>>>>> do not see any checkpoints triggered on Flink UI.
>>>>>>
>>>>>> Am I missing any configurations to be set for the
>>>>>> RemoteExecutionEnvironment for checkpointing to work.
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Vinay Patil
>>>>>>
>>>>>>
>>>>>>
>>>>
>>>
>


Re: Checkpointing not happening in Standalone HA mode

2018-07-27 Thread Vinay Patil
Hi Vino,

Yes I am enabling checkpoint in the code as follows :

StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment(",,getJobConfiguration(),jarPath");


env.enableCheckpointing(1000);

env.setSateBackend(new FsStateBackend("file:///"));

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);


In getJobConfiguration method I have set HA related properties like
HA_STORAGE_PATH,HA_ZOOKEEPER_QUORUM,HA_ZOOKEEPER_ROOT,HA_MODE,HA_JOB_MANAGER_PORT_RANGE,HA_CLUSTER_ID


I can see the error in Job Manager logs where it says Collection Source is
not being executed at the moment. Aborting checkpoint. In the pipeline I
have a stream initialized using "fromCollection". I think I will have to
get rid of this.

What do you suggest

Regards,
Vinay Patil


On Thu, Jul 26, 2018 at 12:04 PM vino yang  wrote:

> Hi Vinay:
>
> Did you call specific config API refer to this documentation[1];
>
> Can you share your job program and JM Log? Or the JM log contains the log
> message like this pattern "Triggering checkpoint {} @ {} for job {}."?
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing
>
> Thanks, vino.
>
> 2018-07-25 19:43 GMT+08:00 Chesnay Schepler :
>
>> Can you provide us with the job code?
>>
>> I assume that checkpointing runs properly if you submit the same job to a
>> normal cluster?
>>
>>
>> On 25.07.2018 13:15, Vinay Patil wrote:
>>
>> No error in the logs. That is why I am not able to understand why
>> checkpoints are not getting triggered.
>>
>> Regards,
>> Vinay Patil
>>
>>
>> On Wed, Jul 25, 2018 at 4:44 PM Vinay Patil 
>> wrote:
>>
>>> Hi Chesnay,
>>>
>>> No error in the logs. That is why I am not able to understand why
>>> checkpoints are getting triggered.
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>>
>>> On Wed, Jul 25, 2018 at 4:36 PM Chesnay Schepler 
>>> wrote:
>>>
>>>> Please check the job- and taskmanager logs for anything suspicious.
>>>>
>>>> On 25.07.2018 12:33, Vinay Patil wrote:
>>>>
>>>> Hi,
>>>>
>>>> I am starting the cluster using bootstrap application where in I am
>>>> calling Job Manager and Task Manager main class to form the cluster. The HA
>>>> cluster is formed correctly and I am able to submit jobs to this cluster
>>>> using RemoteExecutionEnvironment but when I enable checkpointing in code I
>>>> do not see any checkpoints triggered on Flink UI.
>>>>
>>>> Am I missing any configurations to be set for the
>>>> RemoteExecutionEnvironment for checkpointing to work.
>>>>
>>>>
>>>> Regards,
>>>> Vinay Patil
>>>>
>>>>
>>>>
>>
>


Re: Checkpointing not happening in Standalone HA mode

2018-07-25 Thread Vinay Patil
Hi Chesnay,

No error in the logs. That is why I am not able to understand why
checkpoints are getting triggered.

Regards,
Vinay Patil


On Wed, Jul 25, 2018 at 4:36 PM Chesnay Schepler  wrote:

> Please check the job- and taskmanager logs for anything suspicious.
>
> On 25.07.2018 12:33, Vinay Patil wrote:
>
> Hi,
>
> I am starting the cluster using bootstrap application where in I am
> calling Job Manager and Task Manager main class to form the cluster. The HA
> cluster is formed correctly and I am able to submit jobs to this cluster
> using RemoteExecutionEnvironment but when I enable checkpointing in code I
> do not see any checkpoints triggered on Flink UI.
>
> Am I missing any configurations to be set for the
> RemoteExecutionEnvironment for checkpointing to work.
>
>
> Regards,
> Vinay Patil
>
>
>


Re: Checkpointing not happening in Standalone HA mode

2018-07-25 Thread Vinay Patil
No error in the logs. That is why I am not able to understand why
checkpoints are not getting triggered.

Regards,
Vinay Patil


On Wed, Jul 25, 2018 at 4:44 PM Vinay Patil  wrote:

> Hi Chesnay,
>
> No error in the logs. That is why I am not able to understand why
> checkpoints are getting triggered.
>
> Regards,
> Vinay Patil
>
>
> On Wed, Jul 25, 2018 at 4:36 PM Chesnay Schepler 
> wrote:
>
>> Please check the job- and taskmanager logs for anything suspicious.
>>
>> On 25.07.2018 12:33, Vinay Patil wrote:
>>
>> Hi,
>>
>> I am starting the cluster using bootstrap application where in I am
>> calling Job Manager and Task Manager main class to form the cluster. The HA
>> cluster is formed correctly and I am able to submit jobs to this cluster
>> using RemoteExecutionEnvironment but when I enable checkpointing in code I
>> do not see any checkpoints triggered on Flink UI.
>>
>> Am I missing any configurations to be set for the
>> RemoteExecutionEnvironment for checkpointing to work.
>>
>>
>> Regards,
>> Vinay Patil
>>
>>
>>


Checkpointing not happening in Standalone HA mode

2018-07-25 Thread Vinay Patil
Hi,

I am starting the cluster using bootstrap application where in I am calling
Job Manager and Task Manager main class to form the cluster. The HA cluster
is formed correctly and I am able to submit jobs to this cluster using
RemoteExecutionEnvironment but when I enable checkpointing in code I do not
see any checkpoints triggered on Flink UI.

Am I missing any configurations to be set for the
RemoteExecutionEnvironment for checkpointing to work.


Regards,
Vinay Patil


Re: Query regarding rest.port property

2018-07-25 Thread Vinay Patil
Thanks Chesnay for your inputs.

I am actually starting the cluster using bootstrap application where in I
am calling Job Manager and Task Manager main class to form the cluster.

So I have removed flink-runtime-web dependency and used only flink_runtime
dependency for forming the cluster , but still not able to hit the rest
api's, Is there anything else I can do here ?

Yes, you are right about separating the API's into two parts.

Regards,
Vinay Patil


On Sat, Jul 21, 2018 at 1:46 AM Chesnay Schepler  wrote:

> Something that I was thinking about a while ago was to separate the REST
> API into 2 parts;
> one for monitoring (getting details for a job etc.),
> one for modifying state (submitting/canceling jobs, uploading jars, etc.)
>
> This may better fit your requirements.
>
> On 20.07.2018 22:13, Chesnay Schepler wrote:
>
> Effectively you can't disable them selectively; reason being that they are
> actually one and the same.
> The ultimate solution is to build flink-dist yourself, and exclude
> "flink-runtime-web" from it, which removes
> the required files.
>
> Note that being able to selectively disable them _for security reasons_
> wouldn't do you much good as far as I can tell; if you have access to the
> REST API you can do anything the UI does. Similarly, if you can restrict
> access to the REST API, you also do that for the UI.
>
> On 20.07.2018 18:14, Vino yang wrote:
>
> Hi Vinay,
>
> Did job manager run in node "myhost"? Did you check the port you specified
> open for remote access?
>
> Can you try to start web UI, but just forbid its port?
>
> 
> Vino yang
> Thanks.
>
>
> On 2018-07-20 22:48 , Vinay Patil  Wrote:
>
> Hi,
>
> We have disabled Flink Web UI for security reasons however we want to use
> REST Api for monitoring purpose. For that I have set jobmanager.web.port =
> -1 , rest.port=, rest.address=myhost
>
> But I am not able to access any REST api using https://
> myhost:/
>
> Is it mandatory to have Flink Web UI running or am I missing any
> configuration ?
>
> Regards,
> Vinay Patil
>
>
>
>


Query regarding rest.port property

2018-07-20 Thread Vinay Patil
Hi,

We have disabled Flink Web UI for security reasons however we want to use
REST Api for monitoring purpose. For that I have set jobmanager.web.port =
-1 , rest.port=, rest.address=myhost

But I am not able to access any REST api using https://
myhost:/

Is it mandatory to have Flink Web UI running or am I missing any
configuration ?

Regards,
Vinay Patil


Re: Strictly use TLSv1.2

2018-06-22 Thread Vinay Patil
Hi Fabian,

Created a JIRA ticket : https://issues.apache.org/jira/browse/FLINK-9643

Regards,
Vinay Patil


On Fri, Jun 22, 2018 at 1:25 PM Fabian Hueske  wrote:

> Hi Vinay,
>
> This looks like a bug.
> Would you mind creating a Jira ticket [1] for this issue?
>
> Thank you very much,
> Fabian
>
> [1] https://issues.apache.org/jira/projects/FLINK
>
> 2018-06-21 9:25 GMT+02:00 Vinay Patil :
>
>> Hi,
>>
>> I have deployed Flink 1.3.2 and enabled SSL settings. From the ssl debug
>> logs it shows that Flink is using TLSv1.2. However based on the security
>> scans we have observed that it also allows TLSv1.0 and TLSv1.1.
>>
>> In order to strictly use TLSv1.2 we have updated the following property of
>>
>> java.security file:
>> jdk.tls.disabledAlgorithms=MD5, SSLv3, DSA, RSA keySize < 2048, TLSv1,
>> TLSv1.1
>>
>> But still it allows TLSv1.1 , verified this by hitting the following
>> command
>> from master node:
>>
>> openssl s_client -connect taskmanager1: -tls1
>>
>> (here listening_address_port is part of
>> akka.ssl.tcp://flink@taskmanager1:port/user/taskmanager)
>>
>> Now, when I hit the above command for the data port, it does not allow
>> TLSv1.1 and only allows TLSv1.2
>>
>> Can you please let me know how can I enforce all the flink ports to use
>> TLSv1.2.
>>
>> Regards,
>> Vinay Patil
>>
>
>


Re: Strictly use TLSv1.2

2018-06-21 Thread Vinay Patil
Hi,

I have deployed Flink 1.3.2 and enabled SSL settings. From the ssl debug
logs it shows that Flink is using TLSv1.2. However based on the security
scans we have observed that it also allows TLSv1.0 and TLSv1.1.

In order to strictly use TLSv1.2 we have updated the following property of
java.security file:
jdk.tls.disabledAlgorithms=MD5, SSLv3, DSA, RSA keySize < 2048, TLSv1,
TLSv1.1

But still it allows TLSv1.1 , verified this by hitting the following command

from master node:

openssl s_client -connect taskmanager1: -tls1

(here listening_address_port is part of
akka.ssl.tcp://flink@taskmanager1:port/user/taskmanager)

Now, when I hit the above command for the data port, it does not allow
TLSv1.1 and only allows TLSv1.2

Can you please let me know how can I enforce all the flink ports to use
TLSv1.2.

Regards,
Vinay Patil


Re: Strictly use TLSv1.2

2018-06-21 Thread Vinay Patil
Hi,

Can someone please help me with this issue.

Regards,
Vinay Patil



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Plain text SSL passwords in Log file

2018-03-29 Thread Vinay Patil
I have created FLINK-9111 <https://issues.apache.org/jira/browse/FLINK-9111> as
this is not handled in the latest code of GlobalConfiguration.

Regards,
Vinay Patil

On Thu, Mar 29, 2018 at 8:33 AM, Vinay Patil <vinay18.pa...@gmail.com>
wrote:

> Hi,
>
> If this is not part of Flink 1.5 or not handled in latest 1.4.2 release, I
> can open a JIRA. Should be a small change.
>
> What do you think ?
>
> Regards,
> Vinay Patil
>
> On Wed, Mar 28, 2018 at 4:11 PM, Vinay Patil <vinay18.pa...@gmail.com>
> wrote:
>
>> Hi Greg,
>>
>> I am not concerned with flink-conf.yaml file, we have taken care of the
>> passwords there by replacing them with placeholders. We are picking the
>> passwords from our vault.
>>
>> The main issue is that Flink is printing these passwords in plain text in
>> log file. It should be simple check to not print the ssl passwords .
>>
>> Regards,
>> Vinay Patil
>>
>> On Wed, Mar 28, 2018 at 3:53 PM, Greg Hogan <c...@greghogan.com> wrote:
>>
>>> With the current method you always have the risk, no matter which
>>> keywords you filter on ("secret", "password", etc.), that the key name is
>>> mistyped and inadvertently logged.
>>>
>>> Perhaps we could implement something like TravisCI's encryption keys [
>>> https://docs.travis-ci.com/user/encryption-keys/] at a cost of added
>>> complexity.
>>>
>>> On Wed, Mar 28, 2018 at 4:38 PM, Vinay Patil <vinay18.pa...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I see plain text SSL passwords in log file (printed by
>>>> GlobalConfiguration) , because of which we cannot deploy our pipeline to NR
>>>> environment.
>>>>
>>>> I am able to avoid this by having ERROR log level for this class but
>>>> the security team still think it is a risk.
>>>>
>>>> Is this taken care in the new release ? (I am using Flink 1.3.2)
>>>>
>>>> Regards,
>>>> Vinay Patil
>>>>
>>>
>>>
>>
>


Re: Plain text SSL passwords in Log file

2018-03-29 Thread Vinay Patil
Hi,

If this is not part of Flink 1.5 or not handled in latest 1.4.2 release, I
can open a JIRA. Should be a small change.

What do you think ?

Regards,
Vinay Patil

On Wed, Mar 28, 2018 at 4:11 PM, Vinay Patil <vinay18.pa...@gmail.com>
wrote:

> Hi Greg,
>
> I am not concerned with flink-conf.yaml file, we have taken care of the
> passwords there by replacing them with placeholders. We are picking the
> passwords from our vault.
>
> The main issue is that Flink is printing these passwords in plain text in
> log file. It should be simple check to not print the ssl passwords .
>
> Regards,
> Vinay Patil
>
> On Wed, Mar 28, 2018 at 3:53 PM, Greg Hogan <c...@greghogan.com> wrote:
>
>> With the current method you always have the risk, no matter which
>> keywords you filter on ("secret", "password", etc.), that the key name is
>> mistyped and inadvertently logged.
>>
>> Perhaps we could implement something like TravisCI's encryption keys [
>> https://docs.travis-ci.com/user/encryption-keys/] at a cost of added
>> complexity.
>>
>> On Wed, Mar 28, 2018 at 4:38 PM, Vinay Patil <vinay18.pa...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I see plain text SSL passwords in log file (printed by
>>> GlobalConfiguration) , because of which we cannot deploy our pipeline to NR
>>> environment.
>>>
>>> I am able to avoid this by having ERROR log level for this class but the
>>> security team still think it is a risk.
>>>
>>> Is this taken care in the new release ? (I am using Flink 1.3.2)
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>
>>
>


Re: Plain text SSL passwords in Log file

2018-03-28 Thread Vinay Patil
Hi Greg,

I am not concerned with flink-conf.yaml file, we have taken care of the
passwords there by replacing them with placeholders. We are picking the
passwords from our vault.

The main issue is that Flink is printing these passwords in plain text in
log file. It should be simple check to not print the ssl passwords .

Regards,
Vinay Patil

On Wed, Mar 28, 2018 at 3:53 PM, Greg Hogan <c...@greghogan.com> wrote:

> With the current method you always have the risk, no matter which keywords
> you filter on ("secret", "password", etc.), that the key name is mistyped
> and inadvertently logged.
>
> Perhaps we could implement something like TravisCI's encryption keys [
> https://docs.travis-ci.com/user/encryption-keys/] at a cost of added
> complexity.
>
> On Wed, Mar 28, 2018 at 4:38 PM, Vinay Patil <vinay18.pa...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I see plain text SSL passwords in log file (printed by
>> GlobalConfiguration) , because of which we cannot deploy our pipeline to NR
>> environment.
>>
>> I am able to avoid this by having ERROR log level for this class but the
>> security team still think it is a risk.
>>
>> Is this taken care in the new release ? (I am using Flink 1.3.2)
>>
>> Regards,
>> Vinay Patil
>>
>
>


Plain text SSL passwords in Log file

2018-03-28 Thread Vinay Patil
Hi,

I see plain text SSL passwords in log file (printed by GlobalConfiguration)
, because of which we cannot deploy our pipeline to NR environment.

I am able to avoid this by having ERROR log level for this class but the
security team still think it is a risk.

Is this taken care in the new release ? (I am using Flink 1.3.2)

Regards,
Vinay Patil


Unable to see more than 5 jobs on Flink Dashboard

2018-03-28 Thread Vinay Patil
Hi,

I am not able to see more than 5 jobs on Flink Dashboard.
I have set web.history to 50 in flink-conf.yaml file.

Is there any other configuration I have to set to see more jobs on Flink
Dashboard


Regards,
Vinay Patil


Re: Flink SSL Setup on a standalone cluster

2018-03-23 Thread Vinay Patil
Hi,

The passwords are shown in plain text in logs , is this fixed in newer
versions of flink (I am using 1.3.2)

Also, please let me know the answer to my previous queries in this mail
chain

Regards,
Vinay Patil

On Mon, Mar 19, 2018 at 7:35 PM, Vinay Patil <vinay18.pa...@gmail.com>
wrote:

> Hi,
>
> When I set ssl.verify.hostname to true , the job fails with SSL handshake
> exception where it tries to match the IP address  instead of the hostname
> in the certificates. Everything works when I set this to false. The
> keystore is created with FQDN.
> The solution of adding all the hostnames and IP addresses in SAN list is
> discarded by the company.
>
> And a security concern is raised when I set this parameter to false. I see
> this https://issues.apache.org/jira/browse/FLINK-5030 in Unresolved
> state.
> How do Flink support hostname verification ?
>
> @Chesnay : It would be helpful to know the answer to my previous mail
>
> Regards,
> Vinay Patil
>
> On Fri, Mar 16, 2018 at 10:15 AM, Vinay Patil <vinay18.pa...@gmail.com>
> wrote:
>
>> Hi Chesnay,
>>
>> After setting the configurations for Remote Execution Environment the job
>> gets submitted ,I had to set ssl-verify-hostname to false.
>> However, I don't understand why there is a need to do it. I am running
>> the job from master node itself and providing all the configurations in
>> flink-conf.yaml while creating the cluster. So why do I have to copy the
>> same stuff in code ?
>>
>> Regards,
>> Vinay Patil
>>
>> On Fri, Mar 16, 2018 at 8:23 AM, Vinay Patil <vinay18.pa...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> No I am not passing any config to the remote execution environment. I am
>>> running the job from master node itself. I have provided SSL configs in
>>> flink-xonf.yaml
>>>
>>> Do I need to specify any SSL.config as part of Remote Execution env ?
>>>
>>> If yes can you please provide me an example.
>>>
>>>
>>>
>>> On Mar 16, 2018 1:56 AM, "Chesnay Schepler [via Apache Flink User
>>> Mailing List archive.]" <ml+s2336050n1895...@n4.nabble.com> wrote:
>>>
>>> How are you creating the remote environment? In particular, are passing
>>> a configuration to the RemoteEnvironment?
>>> Have you set the SSL options in the config?
>>>
>>>
>>> On 15.03.2018 22:46, Vinay Patil wrote:
>>>
>>> Hi,
>>>
>>> Even tried with ip-address for JobManager.host.name property, but did
>>> not work. When I tried netstat -anp | grep 6123 , I see 3 TM connection
>>> state as established, however when I submit the job , I see two more
>>> entries with state as TIME_WAIT and after some time these entries are gone
>>> and I get a Lost to Job Manager Exception.
>>>
>>> This only happens when SSL is enabled.
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Thu, Mar 15, 2018 at 10:28 AM, Vinay Patil <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node=18950=0>> wrote:
>>>
>>>> Just an update,  I am submitting the job from the master node, not
>>>> using the normal flink run command to submit the job , but using Remote
>>>> Execution Environment in code to do this.
>>>>
>>>> And in that I am passing the hostname which is same as provided in
>>>> flink-conf.yaml
>>>>
>>>> Regards,
>>>> Vinay Patil
>>>>
>>>> On Thu, Mar 15, 2018 at 7:57 AM, Vinay Patil <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node=18950=1>> wrote:
>>>>
>>>>> Hi Guys,
>>>>>
>>>>> Any suggestions here
>>>>>
>>>>> Regards,
>>>>> Vinay Patil
>>>>>
>>>>> On Wed, Mar 14, 2018 at 8:08 PM, Vinay Patil <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node=18950=2>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> After waiting for some time I got the exception as Lost Connection to
>>>>>> Job Manager. Message: Could not retrieve the JobExecutionResult from Job
>>>>>> Manager
>>>>>>
>>>>>> I am submitting the job as remote execution environment. I have
>>>>>> specified the exact hostname of JobManager and port as 6123.
>>>>>>
>>>>>> Please let me know if any other configuratio

Re: Flink SSL Setup on a standalone cluster

2018-03-19 Thread Vinay Patil
Hi,

When I set ssl.verify.hostname to true , the job fails with SSL handshake
exception where it tries to match the IP address  instead of the hostname
in the certificates. Everything works when I set this to false. The
keystore is created with FQDN.
The solution of adding all the hostnames and IP addresses in SAN list is
discarded by the company.

And a security concern is raised when I set this parameter to false. I see
this https://issues.apache.org/jira/browse/FLINK-5030 in Unresolved state.
How do Flink support hostname verification ?

@Chesnay : It would be helpful to know the answer to my previous mail

Regards,
Vinay Patil

On Fri, Mar 16, 2018 at 10:15 AM, Vinay Patil <vinay18.pa...@gmail.com>
wrote:

> Hi Chesnay,
>
> After setting the configurations for Remote Execution Environment the job
> gets submitted ,I had to set ssl-verify-hostname to false.
> However, I don't understand why there is a need to do it. I am running the
> job from master node itself and providing all the configurations in
> flink-conf.yaml while creating the cluster. So why do I have to copy the
> same stuff in code ?
>
> Regards,
> Vinay Patil
>
> On Fri, Mar 16, 2018 at 8:23 AM, Vinay Patil <vinay18.pa...@gmail.com>
> wrote:
>
>> Hi,
>>
>> No I am not passing any config to the remote execution environment. I am
>> running the job from master node itself. I have provided SSL configs in
>> flink-xonf.yaml
>>
>> Do I need to specify any SSL.config as part of Remote Execution env ?
>>
>> If yes can you please provide me an example.
>>
>>
>>
>> On Mar 16, 2018 1:56 AM, "Chesnay Schepler [via Apache Flink User Mailing
>> List archive.]" <ml+s2336050n1895...@n4.nabble.com> wrote:
>>
>> How are you creating the remote environment? In particular, are passing a
>> configuration to the RemoteEnvironment?
>> Have you set the SSL options in the config?
>>
>>
>> On 15.03.2018 22:46, Vinay Patil wrote:
>>
>> Hi,
>>
>> Even tried with ip-address for JobManager.host.name property, but did
>> not work. When I tried netstat -anp | grep 6123 , I see 3 TM connection
>> state as established, however when I submit the job , I see two more
>> entries with state as TIME_WAIT and after some time these entries are gone
>> and I get a Lost to Job Manager Exception.
>>
>> This only happens when SSL is enabled.
>>
>> Regards,
>> Vinay Patil
>>
>> On Thu, Mar 15, 2018 at 10:28 AM, Vinay Patil <[hidden email]
>> <http:///user/SendEmail.jtp?type=node=18950=0>> wrote:
>>
>>> Just an update,  I am submitting the job from the master node, not using
>>> the normal flink run command to submit the job , but using Remote Execution
>>> Environment in code to do this.
>>>
>>> And in that I am passing the hostname which is same as provided in
>>> flink-conf.yaml
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Thu, Mar 15, 2018 at 7:57 AM, Vinay Patil <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node=18950=1>> wrote:
>>>
>>>> Hi Guys,
>>>>
>>>> Any suggestions here
>>>>
>>>> Regards,
>>>> Vinay Patil
>>>>
>>>> On Wed, Mar 14, 2018 at 8:08 PM, Vinay Patil <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node=18950=2>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> After waiting for some time I got the exception as Lost Connection to
>>>>> Job Manager. Message: Could not retrieve the JobExecutionResult from Job
>>>>> Manager
>>>>>
>>>>> I am submitting the job as remote execution environment. I have
>>>>> specified the exact hostname of JobManager and port as 6123.
>>>>>
>>>>> Please let me know if any other configurations are needed.
>>>>>
>>>>> Regards,
>>>>> Vinay Patil
>>>>>
>>>>> On Wed, Mar 14, 2018 at 11:48 AM, Vinay Patil <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node=18950=3>> wrote:
>>>>>
>>>>>> Hi Timo,
>>>>>>
>>>>>> Not getting any exception , it just says waiting for job completion
>>>>>> with a Job ID printed.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Vinay Patil
>>>>>>
>>>>>> On Wed, Mar 14, 2018 at 11:

Re: Flink SSL Setup on a standalone cluster

2018-03-15 Thread Vinay Patil
Hi,

Even tried with ip-address for JobManager.host.name property, but did not
work. When I tried netstat -anp | grep 6123 , I see 3 TM connection state
as established, however when I submit the job , I see two more entries with
state as TIME_WAIT and after some time these entries are gone and I get a
Lost to Job Manager Exception.

This only happens when SSL is enabled.

Regards,
Vinay Patil

On Thu, Mar 15, 2018 at 10:28 AM, Vinay Patil <vinay18.pa...@gmail.com>
wrote:

> Just an update,  I am submitting the job from the master node, not using
> the normal flink run command to submit the job , but using Remote Execution
> Environment in code to do this.
>
> And in that I am passing the hostname which is same as provided in
> flink-conf.yaml
>
> Regards,
> Vinay Patil
>
> On Thu, Mar 15, 2018 at 7:57 AM, Vinay Patil <vinay18.pa...@gmail.com>
> wrote:
>
>> Hi Guys,
>>
>> Any suggestions here
>>
>> Regards,
>> Vinay Patil
>>
>> On Wed, Mar 14, 2018 at 8:08 PM, Vinay Patil <vinay18.pa...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> After waiting for some time I got the exception as Lost Connection to
>>> Job Manager. Message: Could not retrieve the JobExecutionResult from Job
>>> Manager
>>>
>>> I am submitting the job as remote execution environment. I have
>>> specified the exact hostname of JobManager and port as 6123.
>>>
>>> Please let me know if any other configurations are needed.
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Wed, Mar 14, 2018 at 11:48 AM, Vinay Patil <vinay18.pa...@gmail.com>
>>> wrote:
>>>
>>>> Hi Timo,
>>>>
>>>> Not getting any exception , it just says waiting for job completion
>>>> with a Job ID printed.
>>>>
>>>>
>>>>
>>>> Regards,
>>>> Vinay Patil
>>>>
>>>> On Wed, Mar 14, 2018 at 11:34 AM, Timo Walther [via Apache Flink User
>>>> Mailing List archive.] <ml+s2336050n18909...@n4.nabble.com> wrote:
>>>>
>>>>> Hi Vinay,
>>>>>
>>>>> do you have any exception or log entry that describes the failure?
>>>>>
>>>>> Regards,
>>>>> Timo
>>>>>
>>>>>
>>>>> Am 14.03.18 um 15:51 schrieb Vinay Patil:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I have keystore for each of the 4 nodes in cluster and respective
>>>>> trustore. The cluster is configured correctly with SSL , verified this by
>>>>> accessing job manager using https and also see the TM path as 
>>>>> akka.ssl.tcp,
>>>>> however the job is not getting submitted to the cluster.
>>>>>
>>>>> I am not allowed to import the certificate to the java default
>>>>> trustore, so I have provided the trustore and keystore as jvm args to the
>>>>> job.
>>>>>
>>>>> Is there any other configuration I should do so that the job is
>>>>> submitted
>>>>>
>>>>> Regards,
>>>>> Vinay Patil
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> If you reply to this email, your message will be added to the
>>>>> discussion below:
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>> ble.com/Flink-SSL-Setup-on-a-standalone-cluster-tp18907p18909.html
>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>> email ml+s2336050n1...@n4.nabble.com
>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>> here
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
>>>>> .
>>>>> NAML
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>
>>>>
>>>>
>>>
>>
>


Re: Flink SSL Setup on a standalone cluster

2018-03-15 Thread Vinay Patil
Just an update,  I am submitting the job from the master node, not using
the normal flink run command to submit the job , but using Remote Execution
Environment in code to do this.

And in that I am passing the hostname which is same as provided in
flink-conf.yaml

Regards,
Vinay Patil

On Thu, Mar 15, 2018 at 7:57 AM, Vinay Patil <vinay18.pa...@gmail.com>
wrote:

> Hi Guys,
>
> Any suggestions here
>
> Regards,
> Vinay Patil
>
> On Wed, Mar 14, 2018 at 8:08 PM, Vinay Patil <vinay18.pa...@gmail.com>
> wrote:
>
>> Hi,
>>
>> After waiting for some time I got the exception as Lost Connection to Job
>> Manager. Message: Could not retrieve the JobExecutionResult from Job Manager
>>
>> I am submitting the job as remote execution environment. I have specified
>> the exact hostname of JobManager and port as 6123.
>>
>> Please let me know if any other configurations are needed.
>>
>> Regards,
>> Vinay Patil
>>
>> On Wed, Mar 14, 2018 at 11:48 AM, Vinay Patil <vinay18.pa...@gmail.com>
>> wrote:
>>
>>> Hi Timo,
>>>
>>> Not getting any exception , it just says waiting for job completion with
>>> a Job ID printed.
>>>
>>>
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Wed, Mar 14, 2018 at 11:34 AM, Timo Walther [via Apache Flink User
>>> Mailing List archive.] <ml+s2336050n18909...@n4.nabble.com> wrote:
>>>
>>>> Hi Vinay,
>>>>
>>>> do you have any exception or log entry that describes the failure?
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> Am 14.03.18 um 15:51 schrieb Vinay Patil:
>>>>
>>>> Hi,
>>>>
>>>> I have keystore for each of the 4 nodes in cluster and respective
>>>> trustore. The cluster is configured correctly with SSL , verified this by
>>>> accessing job manager using https and also see the TM path as akka.ssl.tcp,
>>>> however the job is not getting submitted to the cluster.
>>>>
>>>> I am not allowed to import the certificate to the java default
>>>> trustore, so I have provided the trustore and keystore as jvm args to the
>>>> job.
>>>>
>>>> Is there any other configuration I should do so that the job is
>>>> submitted
>>>>
>>>> Regards,
>>>> Vinay Patil
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> If you reply to this email, your message will be added to the
>>>> discussion below:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>> ble.com/Flink-SSL-Setup-on-a-standalone-cluster-tp18907p18909.html
>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>> email ml+s2336050n1...@n4.nabble.com
>>>> To unsubscribe from Apache Flink User Mailing List archive., click here
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
>>>> .
>>>> NAML
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>
>>>
>>>
>>
>


Flink SSL Setup on a standalone cluster

2018-03-14 Thread Vinay Patil
Hi,

I have keystore for each of the 4 nodes in cluster and respective trustore.
The cluster is configured correctly with SSL , verified this by accessing
job manager using https and also see the TM path as akka.ssl.tcp, however
the job is not getting submitted to the cluster.

I am not allowed to import the certificate to the java default trustore, so
I have provided the trustore and keystore as jvm args to the job.

Is there any other configuration I should do so that the job is submitted

Regards,
Vinay Patil


Regarding Task Slots allocation

2018-02-16 Thread Vinay Patil
Hi,

I am trying to deploy a flink job to remote cluster using
remoteExecutionEnvironment, I have specified the number of task slots for
Task Manager to 2 , so it should have ideally taken 2 slots only, however
all the slots are getting utilized. Is there any other configuration I have
to do ?

Regards,
Vinay Patil


Concurrent modification Exception when submitting multiple jobs

2018-02-15 Thread Vinay Patil
Hi,

I am submitting job to the cluster (using remote execution env) from
multiple threads. I am getting the following exception


java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
at java.util.ArrayList$Itr.next(ArrayList.java:859)
at
org.apache.flink.streaming.api.graph.StreamGraphGenerator.generateInternal(StreamGraphGenerator.java:128)
at
org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:121)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1526)
at
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:173)
at
com.test.executors.FlinkExecutor.submitJobToCluster(FlinkExecutor.java:67)


I am using Flink 1.3.2, and I am making sure that the job name is different
for each job. 
Can you please let me know if I am doing something wrong.

Regards,
Vinay Patil



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Get the JobID when Flink job fails

2018-02-06 Thread Vinay Patil
Hi,

I see we can generate our own JobID, but how do I use it to submit the job
to the cluster.

I am using remoteExecutionEnvironment to submit the job to the cluster.

Also, can you please answer the query of earlier mail.

Regards,
Vinay Patil

On Thu, Feb 1, 2018 at 1:50 PM, Vinay Patil <vinay18.pa...@gmail.com> wrote:

> Hi,
>
> When the Flink job executes successfully I get the jobID, however when the
> Flink job fails the jobID is not returned.
>
> How do I get the jobId in this case ?
>
> Do I need to call /joboverview REST api to get the job ID by  looking for
> the Job Name ?
>
> Regards,
> Vinay Patil
>


Get the JobID when Flink job fails

2018-02-01 Thread Vinay Patil
Hi,

When the Flink job executes successfully I get the jobID, however when the
Flink job fails the jobID is not returned.

How do I get the jobId in this case ?

Do I need to call /joboverview REST api to get the job ID by  looking for
the Job Name ?

Regards,
Vinay Patil


Re: Send ACK when all records of file are processed

2018-01-25 Thread Vinay Patil
Hi Piotrek,

Thank you for your detailed answer.

Yes, I want to generate the ack when all the records of the file are
written to DB.

So to understand what you are saying , we will receive a single EOF
watermark value at the ack operator when all the downstream operator
process all the records of the file. But what I understand regarding the
watermark is each parallel instance of the operator will emit the
watermark, so how do I ensure that the EOF is reached  or will I receive
only one watermark at the ack operator ?


So the pipeline topology will look like

DataStream  readFileStream = env.readFile()

readFileStream
 .transform(// ContrinousFileReaderOperator)
 .key(0)
 .map(// encrichment)
  .addSink(// DB)

 instead of add sink, should it be a  simple map operator which writes to
DB so that we can have a next ack operator which will generate the response.

Also, how do I get/access the Watermark value in the ack operator ? It will
be a simple  map operator, right ?





Regards,
Vinay Patil

On Thu, Jan 25, 2018 at 4:50 AM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> As you figured out, some dummy EOF record is one solution, however you
> might try to achieve it also by wrapping an existing CSV function. Your
> wrapper could emit this dummy EOF record. Another (probably better) idea is
> to use Watermark(Long.MAX_VALUE) for the EOF marker. Stream source and/or
> ContrinousFileReaderOperator will do that for you, so you would just need
> to handle the Watermark.
>
> The question is, do you need to perform the ACK operation AFTER all of the
> DB writes, or just after reading the CSV file? If the latter one, you could
> add some custom ACK operator with parallelism one just after the CSV source
> that waits for the EOF Watermark.
>
> If it is the first one (some kind of committing the DB writes), you would
> need to to wait until the EOF passes through all of your operators. You
> would need something like that:
>
> parallelism 1 for source -> default parallelism for keyBy/enrichment/db
> writes -> parallelism 1 for ACK operator on Watermark(Long.MAX_VALUE)
>
> I hope this helps,
> Piotrek
>
> On 24 Jan 2018, at 23:19, Vinay Patil <vinay18.pa...@gmail.com> wrote:
>
> Hi Guys,
>
> Following is how my pipeline looks (DataStream API) :
>
> [1] Read the data from the csv file
> [2] KeyBy it by some id
> [3] Do the enrichment and write it to DB
>
> [1] reads the data in sequence as it has single parallelism and then I
> have default parallelism for the other operators.
>
> I want to generate a response (ack) when all the data of the file is
> processed. How can I achieve this ?
>
> One solution I can think of is to have EOF dummy record in a file and a
> unique field for all the records in that file. Doing a keyBy on this field
> will make sure that all records are sent to a single slot. So, when EOF
> dummy records is read I can generate a response/ack.
>
> Is there a better way I can deal with this ?
>
>
> Regards,
> Vinay Patil
>
>
>


Send ACK when all records of file are processed

2018-01-24 Thread Vinay Patil
Hi Guys,

Following is how my pipeline looks (DataStream API) :

[1] Read the data from the csv file
[2] KeyBy it by some id
[3] Do the enrichment and write it to DB

[1] reads the data in sequence as it has single parallelism and then I have
default parallelism for the other operators.

I want to generate a response (ack) when all the data of the file is
processed. How can I achieve this ?

One solution I can think of is to have EOF dummy record in a file and a
unique field for all the records in that file. Doing a keyBy on this field
will make sure that all records are sent to a single slot. So, when EOF
dummy records is read I can generate a response/ack.

Is there a better way I can deal with this ?


Regards,
Vinay Patil


Re: S3 Write Execption

2017-12-05 Thread vinay patil
Hi Stephan,

I am facing S3 consistency related issue with the exception pasted at the
end:

We were able to solve the s3 sync issue by adding System.currentTime to
inprogressPrefix, inprogressSuffix, s3PendingPrefix and s3PendingSuffix
properties of BucketingSink.

I tried another approach by updating the BucketingSink code wherein I have
appended the partPath variable with System.currentTime (in openNewPartFile
method).

Can you please let me know if this is the correct approach in order to get
rid of this exception.

TimerException{java.io.IOException: Unable to create file due to concurrent
write, file corrupted potentially: s3:///part-0-0inprogress}
 at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
 at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to create file due to concurrent
write, file corrupted potentially: s3:///part-0-0inprogress
 at
com.amazon.ws.emr.hadoop.fs.consistency.ConsistencyCheckerS3FileSystem$1.execute(ConsistencyCheckerS3FileSystem.java:245)
 at
com.amazon.ws.emr.hadoop.fs.consistency.ConsistencyCheckerS3FileSystem$1.execute(ConsistencyCheckerS3FileSystem.java:201)
 at
com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.close(S3FSOutputStream.java:188)
 at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
 at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108)
 at
org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:147)
 at
org.apache.flink.streaming.connectors.fs.SequenceFileWriter.close(SequenceFileWriter.java:116)
 at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:554)
 at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:496)
 at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:479)


Regards,
Vinay Patil



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: S3 Write Execption

2017-12-05 Thread vinay patil
Hi Stephan,

I am facing S3 consistency related issue with the exception pasted at the
end:

We were able to solve the s3 sync issue by adding System.currentTime to
inprogressPrefix, inprogressSuffix, s3PendingPrefix and s3PendingSuffix
properties of BucketingSink.

I tried another approach by updating the BucketingSink code wherein I have
appended the partPath variable with System.currentTime (in openNewPartFile
method).

Can you please let me know if this is the correct approach in order to get
rid of this exception.

TimerException{java.io.IOException: Unable to create file due to concurrent
write, file corrupted potentially: s3:///part-0-0inprogress}
 at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
 at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to create file due to concurrent
write, file corrupted potentially: s3:///part-0-0inprogress
 at
com.amazon.ws.emr.hadoop.fs.consistency.ConsistencyCheckerS3FileSystem$1.execute(ConsistencyCheckerS3FileSystem.java:245)
 at
com.amazon.ws.emr.hadoop.fs.consistency.ConsistencyCheckerS3FileSystem$1.execute(ConsistencyCheckerS3FileSystem.java:201)
 at
com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.close(S3FSOutputStream.java:188)
 at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
 at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108)
 at
org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:147)
 at
org.apache.flink.streaming.connectors.fs.SequenceFileWriter.close(SequenceFileWriter.java:116)
 at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:554)
 at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:496)
 at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:479)


Regards,
Vinay Patil



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Serialization issues with DataStreamUtils

2017-08-31 Thread vinay patil
Hi,

After adding the following two lines the serialization trace does not show
the Schema related classes:

env.getConfig().registerTypeWithKryoSerializer(GenericData.Array.class,
Serializers.SpecificInstanceCollectionSerializerForArrayList.class);
   
env.getConfig().addDefaultKryoSerializer(Schema.class,Serializers.AvroSchemaSerializer.class);

However I still get exception for :
Serialization trace:
schema (org.apache.avro.generic.GenericData$Record)

The default Kyro serializer is not able to serialize GenericData.Record
class. 

Any other way I can get rid off this exception.

P.S I do not see this exception when I run the actual pipeline, this is only
coming in one of our test case



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Elastic Sink AWS ES

2017-08-27 Thread vinay patil
Hi,

We have recently moved to AWS ES service, I am using the following code:

https://github.com/awslabs/flink-stream-processing-refarch/blob/master/flink-taxi-stream-processor/src/main/java/com/amazonaws/flink/refarch/utils/ElasticsearchJestSink.java

(Note that this is not the inbuilt Flink ESSink)

You can read this blog post:
https://aws.amazon.com/blogs/big-data/build-a-real-time-stream-processing-pipeline-with-apache-flink-on-aws/


Regards,
Vinay Patil

On Sun, Aug 27, 2017 at 7:02 PM, ant burton [via Apache Flink User Mailing
List archive.] <ml+s2336050n15173...@n4.nabble.com> wrote:

> Thanks! I'll check later this evening.
>
> On Sun, 27 Aug 2017 at 07:44, arpit srivastava <[hidden email]
> <http:///user/SendEmail.jtp?type=node=15173=0>> wrote:
>
>> We also had same setup where ES cluster was behind a proxy server for
>> which port 80 was used which redirected it to ES cluster 9200 port.
>>
>> For using Flink we got the actual ip address of the ES nodes and put that
>> in ips below.
>>
>> transportAddresses.add(new 
>> InetSocketAddress(InetAddress.getByName("127.0.0.1"), 
>> 9300))transportAddresses.add(new 
>> InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300))
>>
>> But this worked only because 9300 port was open on ES nodes in our setup
>> and so accessible from our Flink cluster.​
>>
>> Get your node list on your ES Cluster using
>>
>> curl -XGET 'http:///_nodes'
>>
>>
>>
>> ​and then check whether you can telnet on that  on port 9300
>> from your flink cluster nodes
>>
>> $ *telnet  9300*
>>
>> If this works then you can use above solution.​
>>
>>
>> On Sun, Aug 27, 2017 at 4:09 AM, ant burton <[hidden email]
>> <http:///user/SendEmail.jtp?type=node=15173=1>> wrote:
>>
>>> Hi Ted,
>>>
>>> Changing the port from 9300 to 9200 in the example you provides causes
>>> the error in the my original message
>>>
>>> my apologies for not providing context in the form of code in my
>>> original message, to confirm I am using the example you provided in my
>>> application and have it working using port 9300 in a docker environment
>>> locally.
>>>
>>> Thanks,
>>>
>>> On 26 Aug 2017, at 23:24, Ted Yu <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node=15173=2>> wrote:
>>>
>>> If port 9300 in the following example is replaced by 9200, would that
>>> work ?
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-
>>> release-1.3/dev/connectors/elasticsearch.html
>>>
>>> Please use Flink 1.3.1+
>>>
>>> On Sat, Aug 26, 2017 at 3:00 PM, ant burton <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node=15173=3>> wrote:
>>>
>>>> Hello,
>>>>
>>>> Has anybody been able to use the Flink Elasticsearch connector to sink
>>>> data to AWS ES.
>>>>
>>>> I don’t believe this is possible as AWS ES only allows access to port
>>>> 9200 (via port 80) on the master node of the ES cluster, and not port 9300
>>>> used by the the Flink Elasticsearch connector.
>>>>
>>>> The error message that occurs when attempting to connect to AWS ES via
>>>> port 80 (9200) with the Flink Elasticsearch connector is:
>>>>
>>>> Elasticsearch client is not connected to any Elasticsearch nodes!
>>>>
>>>> Could anybody confirm the above? and if possible provide an alternative
>>>> solution?
>>>>
>>>> Thanks you,
>>>
>>>
>>>
>>>
>>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Flink-Elastic-Sink-AWS-ES-tp15162p15173.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Elastic-Sink-AWS-ES-tp15162p15174.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Serialization issues with DataStreamUtils

2017-08-26 Thread vinay patil
Hi Robert,

The test case code is as follows:
GenericRecord testData = new GenericData.Record(avroSchema);
SingleOutputStreamOperator testStream =
env.fromElements(testData)


.map(new DummyOperator(...));

Iteratoriterator = DataStreamUtils.collect(testStream);

Here is the complete stack trace:

Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered
class ID: 229
Serialization trace:
reserved (org.apache.avro.Schema$NullSchema)
types (org.apache.avro.Schema$UnionSchema)
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
at
org.apache.flink.contrib.streaming.SocketStreamIterator.readNextFromStream(SocketStreamIterator.java:149)
at
org.apache.flink.contrib.streaming.SocketStreamIterator.hasNext(SocketStreamIterator.java:112)



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Serialization-issues-with-DataStreamUtils-tp15139p15159.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Serialization issues with DataStreamUtils

2017-08-25 Thread vinay patil
Hi Guys,

I am using DataStreamUtils for unit testing, the test case succeeds when it
is run individually but I get the following error when all the tests are
run:

Serialization trace:
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
at
org.apache.flink.contrib.streaming.SocketStreamIterator.hasNext(SocketStreamIterator.java:114)

I tried to to register the above classes but it did not work. Also this
error comes randomly for some tests while some test pass.

What could be the issue ?

Regards,
Vinay Patil




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Serialization-issues-with-DataStreamUtils-tp15139.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink Data Streaming to S3

2017-08-14 Thread vinay patil
Hi,

Yes, I am able to write to S3 using DataStream API.

I have answered you the approach on SO

Regards,
Vinay Patil

On Mon, Aug 14, 2017 at 4:21 AM, ant burton [via Apache Flink User Mailing
List archive.] <ml+s2336050n14871...@n4.nabble.com> wrote:

> Hello,
>
> Has anybody been able to write to S3 when using the data streaming API ?
>
> I’m having this problem https://stackoverflow.
> com/questions/45655850/flink-s3-write-fails-unable-to-load-
> aws-credentials-from-any-provider-in-the-cha
>
> Thanks
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Flink-Data-Streaming-to-S3-tp14871.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Data-Streaming-to-S3-tp14871p14877.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: No file system found with scheme s3

2017-08-12 Thread vinay patil
Hi,

The config should be *fs.s3a.impl* instead of *fs.s3.impl*

Also when you are providing the S3 write path in config file or directly in
code start with *s3a://*



Regards,
Vinay Patil

On Sat, Aug 12, 2017 at 6:07 AM, ant burton [via Apache Flink User Mailing
List archive.] <ml+s2336050n1484...@n4.nabble.com> wrote:

> Hello,
>
> After following the instructions to set the S3 filesystem in the
> documentation (https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/setup/aws.html#set-s3-filesystem) I encountered the following
> error:
>
> No file system found with scheme s3, referenced in file URI 
> 's3:///'.
>
> The documentation goes on to say  “If your job submission fails with an
> Exception message noting that No file system found with scheme s3 this
> means that no FileSystem has been configured for S3. Please check out the 
> FileSystem
> Configuration section
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html#set-s3-filesystem>
>  for
> details on how to configure this properly."
>
> After checking over the configuration the error persisted. My
> configuration is as follows.
>
> I am using the docker image flink:1.3.1, with command: local
>
> # flink --version
> Version: 1.3.1, Commit ID: 1ca6e5b
>
> # cat flink/config/flink-conf.yaml | head -n1
> fs.hdfs.hadoopconf: /root/hadoop-config
>
> The rest of the content of flink-conf.yaml is identical to the release
> version.
>
> The following was added to /root/hadoop-config/core-site.xml,
> I understand this is used internally by flink as configuration
> for “org.apache.hadoop.fs.s3a.S3AFileSystem”
>
> I’ve removed my AWS access key and secret for obvious reasons, they are
> present in the actual file ;-)
>
> # cat  /root/hadoop-config/core-site.xml
> 
> 
> fs.s3.impl
> org.apache.hadoop.fs.s3a.S3AFileSystem
> 
>
> 
> fs.s3a.buffer.dir
> /tmp
> 
>
> 
> fs.s3a.access.key
> MY_ACCESS_KEY
> 
>
> 
> fs.s3a.secret.key
> MY_SECRET_KEY
> 
> 
>
> The JAR’s aws-java-sdk-1.7.4.jar, hadoop-aws-2.7.4.jar,
> httpclient-4.2.5.jar, httpcore-4.2.5.jar where added to flink/lib/ from
> http://apache.mirror.anlx.net/hadoop/common/hadoop-
> 2.7.4/hadoop-2.7.4.tar.gz
>
> # ls flink/lib/
> aws-java-sdk-1.7.4.jar
> flink-dist_2.11-1.3.1.jar
> flink-python_2.11-1.3.1.jar
> flink-shaded-hadoop2-uber-1.3.1.jar
> hadoop-aws-2.7.4.jar
> httpclient-4.2.5.jar
> httpcore-4.2.5.jar
> log4j-1.2.17.jar
> slf4j-log4j12-1.7.7.jar
>
> I’m using the streaming api, with the following example:
>
> // Set StreamExecutionEnvironment
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
>
> // Set checkpoints in ms
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> // Add source (input stream)
> DataStream dataStream = StreamUtil.getDataStream(env, params);
>
> // Sink to S3 Bucket
> dataStream.writeAsText(" class="">s3://test-flink/test.txt").setParallelism(1);
>
> pom.xml has the following build dependencies.
>
> 
> org.apache.flink
> flink-connector-filesystem_2.10
> 1.3.1
> 
>
> 
> org.apache.hadoop
> hadoop-aws
> 2.7.2
> 
>
>
> Would anybody be able to spare some time to help me resolve my problem?
> I'm sure I’m missing something simple here.
>
> Thanks  :-)
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/No-file-system-found-with-scheme-s3-tp14847.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-file-system-found-with-scheme-s3-tp14847p14853.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: [EXTERNAL] Re: Help required - "BucketingSink" usage to write HDFS Files

2017-08-07 Thread vinay patil
Hi Raja,

That is why they are in the pending state. You can enable checkpointing by
setting env.enableCheckpointing()

After doing this they will not remain in pending state.

Check this out :
https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html

Regards,
Vinay Patil

On Mon, Aug 7, 2017 at 9:15 AM, Raja.Aravapalli [via Apache Flink User
Mailing List archive.] <ml+s2336050n14716...@n4.nabble.com> wrote:

> Hi Vinay,
>
>
>
> Thanks for the response.
>
>
>
> I have NOT enabled any checkpointing.
>
>
>
> Files are rolling out correctly for every 2mb, but the files are remaining
> as below:
>
>
>
> -rw-r--r--   3 2097424 2017-08-06 21:10 *///*/Test/part-0-0.
> pending
>
> -rw-r--r--   3 1431430 2017-08-06 21:12 *///*/Test/part-0-1.
> pending
>
>
>
>
>
> Regards,
>
> Raja.
>
>
>
> *From: *vinay patil <[hidden email]
> <http:///user/SendEmail.jtp?type=node=14716=0>>
> *Date: *Sunday, August 6, 2017 at 10:40 PM
> *To: *"[hidden email]
> <http:///user/SendEmail.jtp?type=node=14716=1>" <[hidden email]
> <http:///user/SendEmail.jtp?type=node=14716=2>>
> *Subject: *[EXTERNAL] Re: Help required - "BucketingSink" usage to write
> HDFS Files
>
>
>
> Hi Raja,
>
> Have you enabled checkpointing?
>
> The files will be rolled to complete state when the batch size is reached
> (in your case 2 MB) or when the bucket is inactive for a certain amount of
> time.
>
>
> Regards,
>
> Vinay Patil
>
>
>
> On Mon, Aug 7, 2017 at 7:53 AM, Raja.Aravapalli [via Apache Flink User
> Mailing List archive.] <[hidden email]> wrote:
>
>
>
> Hi,
>
>
>
> I am working on a poc to write to hdfs files using BucketingSink class.
> Even thought I am the data is being writing to hdfs files, but the files
> are lying with “.pending” on hdfs.
>
>
>
>
>
> Below is the code I am using. Can someone pls help me identify the issue
> and help me fix this ?
>
>
>
>
>
> BucketingSink HdfsSink = *new *BucketingSink(
> *"hdfs://///Test/"*);
>
>
>
> *HdfsSink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"));
> HdfsSink.setBatchSize(1024 * 1024 * 2); // this is 2 MB,
> HdfsSink.setInactiveBucketCheckInterval(1L);
> HdfsSink.setInactiveBucketThreshold(1L);*
>
>
>
>
>
> Thanks a lot.
>
>
>
>
>
> Regards,
>
> Raja.
>
>
> --
>
> *If you reply to this email, your message will be added to the discussion
> below:*
>
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Help-required-BucketingSink-usage-to-write-
> HDFS-Files-tp14714.html
>
> To start a new topic under Apache Flink User Mailing List archive., email 
> [hidden
> email]
> To unsubscribe from Apache Flink User Mailing List archive., click here.
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>
>
>
>
> --
>
> View this message in context: Re: Help required - "BucketingSink" usage
> to write HDFS Files
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Help-required-BucketingSink-usage-to-write-HDFS-Files-tp14714p14715.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Help-required-BucketingSink-usage-to-write-
> HDFS-Files-tp14714p14716.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Help-required-BucketingSink-usage-to-write-HDFS-Files-tp14714p14717.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Help required - "BucketingSink" usage to write HDFS Files

2017-08-06 Thread vinay patil
Hi Raja,

Have you enabled checkpointing?

The files will be rolled to complete state when the batch size is reached
(in your case 2 MB) or when the bucket is inactive for a certain amount of
time.


Regards,
Vinay Patil

On Mon, Aug 7, 2017 at 7:53 AM, Raja.Aravapalli [via Apache Flink User
Mailing List archive.] <ml+s2336050n14714...@n4.nabble.com> wrote:

>
>
> Hi,
>
>
>
> I am working on a poc to write to hdfs files using BucketingSink class.
> Even thought I am the data is being writing to hdfs files, but the files
> are lying with “.pending” on hdfs.
>
>
>
>
>
> Below is the code I am using. Can someone pls help me identify the issue
> and help me fix this ?
>
>
>
>
>
> BucketingSink HdfsSink = *new *BucketingSink(
> *"hdfs://///Test/"*);
>
>
>
> *HdfsSink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"));
> HdfsSink.setBatchSize(1024 * 1024 * 2); // this is 2 MB,
> HdfsSink.setInactiveBucketCheckInterval(1L);
> HdfsSink.setInactiveBucketThreshold(1L);*
>
>
>
>
>
> Thanks a lot.
>
>
>
>
>
> Regards,
>
> Raja.
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Help-required-BucketingSink-usage-to-write-
> HDFS-Files-tp14714.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Help-required-BucketingSink-usage-to-write-HDFS-Files-tp14714p14715.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Memory Leak - Flink / RocksDB ?

2017-07-25 Thread vinay patil
Hi Shashwat,

Are you specifying the RocksDBStateBackend from the flink-conf.yaml or from
code?

If you are specifying it from the code, you can try using
PredefinedOptions.FLASH_SSD_OPTIMIZED
Also, you can try setting incremental checkpointing ( this feature is in
Flink 1.3.0)

If the above does not solve your issue, you can control the memory usage of
RocksDB by tuning the following values and check your performance :

*DBOptions: *
 (along with the FLASH_SSD_OPTIONS add the following)
 maxBackgroundCompactions(4)

*ColumnFamilyOptions:*
  max_buffer_size : 512 MB
  block_cache_size : 128 MB
  max_write_buffer_number : 5
  minimum_buffer_number_to_merge : 2
  cacheIndexAndFilterBlocks : true
  optimizeFilterForHits: true


I would recommend reading the following documents:

*Memory usage of RocksDB* :
https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB

*RocksDB Tuning Guide:*
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide

Hope it helps.


Regards,
Vinay Patil

On Tue, Jul 25, 2017 at 6:51 PM, Shashwat Rastogi [via Apache Flink User
Mailing List archive.] <ml+s2336050n14439...@n4.nabble.com> wrote:

> Hi,
>
> We have several Flink jobs, all of which reads data from Kafka do some
> aggregations (over sliding windows of (1d, 1h)) and writes data to
> Cassandra. Something like :
>
> ```
> DataStream lines = env.addSource(new FlinkKafkaConsumer010( … ));
> DataStream events = lines.map(line -> parse(line));
> DataStream stats = stream
> .keyBy(“id”)
> .timeWindow(1d, 1h)
> .sum(new MyAggregateFunction());
> writeToCassandra(stats);
> ```
>
> We recently made a switch to RocksDbStateBackend, for it’s suitability for
> large states/long windows. However, after making the switch a memory issues
> has come up, the memory utilisation on TaskManager gradually increases from
> 50 GB to ~63GB until the container is killed. We are unable to figure out
> what is causing this behaviour, is there some memory leak on the RocksDB ?
>
> How much memory should we allocate to the Flink TaskManager? Since,
> RocksDB is a native application and it does not use the JVM how much of the
> memory should we allocate/leave for RocksDB (out of 64GB of total memory).
> Is there a way to set the maximum amount of memory that will be used by
> RocksDB so that it doesn’t overwhelms the system? Are there some
> recommended optimal settings for RocksDB for larger states (for 1 day
> window average state size is 3GB).
>
> Any help would be greatly appreciated. I am using Flink v1.2.1.
> Thanks in advance.
>
> Best,
> Shashwat
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Memory-Leak-Flink-RocksDB-tp14439.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Memory-Leak-Flink-RocksDB-tp14439p14441.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Why would a kafka source checkpoint take so long?

2017-07-13 Thread Vinay Patil
Hi Stephan,

Sure will do that next time when I observe it.

Regards,
Vinay Patil

On Thu, Jul 13, 2017 at 8:09 PM, Stephan Ewen <se...@apache.org> wrote:

> Is there any way you can pull a thread dump from the TMs at the point when
> that happens?
>
> On Wed, Jul 12, 2017 at 8:50 PM, vinay patil <vinay18.pa...@gmail.com>
> wrote:
>
>> Hi Gyula,
>>
>> I have observed similar issue with FlinkConsumer09 and 010 and posted it
>> to the mailing list as well . This issue is not consistent, however
>> whenever it happens it leads to checkpoints getting failed or taking a long
>> time to complete.
>>
>> Regards,
>> Vinay Patil
>>
>> On Wed, Jul 12, 2017 at 7:00 PM, Gyula Fóra [via Apache Flink User
>> Mailing List archive.] <[hidden email]
>> <http:///user/SendEmail.jtp?type=node=14232=0>> wrote:
>>
>>> I have added logging that will help determine this as well, next time
>>> this happens I will post the results. (Although there doesnt seem to be
>>> high backpressure)
>>>
>>> Thanks for the tips,
>>> Gyula
>>>
>>> Stephan Ewen <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node=14210=0>> ezt írta
>>> (időpont: 2017. júl. 12., Sze, 15:27):
>>>
>>>> Can it be that the checkpoint thread is waiting to grab the lock, which
>>>> is held by the chain under backpressure?
>>>>
>>>> On Wed, Jul 12, 2017 at 12:23 PM, Gyula Fóra <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node=14210=1>> wrote:
>>>>
>>>>> Yes thats definitely what I am about to do next but just thought maybe
>>>>> someone has seen this before.
>>>>>
>>>>> Will post info next time it happens. (Not guaranteed to happen soon as
>>>>> it didn't happen for a long time before)
>>>>>
>>>>> Gyula
>>>>>
>>>>> On Wed, Jul 12, 2017, 12:13 Stefan Richter <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node=14210=2>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> could you introduce some logging to figure out from which method call
>>>>>> the delay is introduced?
>>>>>>
>>>>>> Best,
>>>>>> Stefan
>>>>>>
>>>>>> Am 12.07.2017 um 11:37 schrieb Gyula Fóra <[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node=14210=3>>:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> We are using the latest 1.3.1
>>>>>>
>>>>>> Gyula
>>>>>>
>>>>>> Urs Schoenenberger <[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node=14210=4>> ezt írta
>>>>>> (időpont: 2017. júl. 12., Sze, 10:44):
>>>>>>
>>>>>>> Hi Gyula,
>>>>>>>
>>>>>>> I don't know the cause unfortunately, but we observed a similiar
>>>>>>> issue
>>>>>>> on Flink 1.1.3. The problem seems to be gone after upgrading to
>>>>>>> 1.2.1.
>>>>>>> Which version are you running on?
>>>>>>>
>>>>>>> Urs
>>>>>>>
>>>>>>> On 12.07.2017 09:48, Gyula Fóra wrote:
>>>>>>> > Hi,
>>>>>>> >
>>>>>>> > I have noticed a strange behavior in one of our jobs: every once
>>>>>>> in a while
>>>>>>> > the Kafka source checkpointing time becomes extremely large
>>>>>>> compared to
>>>>>>> > what it usually is. (To be very specific it is a kafka source
>>>>>>> chained with
>>>>>>> > a stateless map operator)
>>>>>>> >
>>>>>>> > To be more specific checkpointing the offsets usually takes around
>>>>>>> 10ms
>>>>>>> > which sounds reasonable but in some checkpoints this goes into the
>>>>>>> 3-5
>>>>>>> > minutes range practically blocking the job for that period of time.
>>>>>>> > Yesterday I have observed even 10 minute delays. First I thought
>>>>>>> that some
>>>>>>> > sources might trigger checkpoints later than others, but adding
>&

Re: Why would a kafka source checkpoint take so long?

2017-07-12 Thread vinay patil
Hi Gyula,

I have observed similar issue with FlinkConsumer09 and 010 and posted it to
the mailing list as well . This issue is not consistent, however whenever
it happens it leads to checkpoints getting failed or taking a long time to
complete.

Regards,
Vinay Patil

On Wed, Jul 12, 2017 at 7:00 PM, Gyula Fóra [via Apache Flink User Mailing
List archive.] <ml+s2336050n14210...@n4.nabble.com> wrote:

> I have added logging that will help determine this as well, next time this
> happens I will post the results. (Although there doesnt seem to be high
> backpressure)
>
> Thanks for the tips,
> Gyula
>
> Stephan Ewen <[hidden email]
> <http:///user/SendEmail.jtp?type=node=14210=0>> ezt írta (időpont:
> 2017. júl. 12., Sze, 15:27):
>
>> Can it be that the checkpoint thread is waiting to grab the lock, which
>> is held by the chain under backpressure?
>>
>> On Wed, Jul 12, 2017 at 12:23 PM, Gyula Fóra <[hidden email]
>> <http:///user/SendEmail.jtp?type=node=14210=1>> wrote:
>>
>>> Yes thats definitely what I am about to do next but just thought maybe
>>> someone has seen this before.
>>>
>>> Will post info next time it happens. (Not guaranteed to happen soon as
>>> it didn't happen for a long time before)
>>>
>>> Gyula
>>>
>>> On Wed, Jul 12, 2017, 12:13 Stefan Richter <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node=14210=2>> wrote:
>>>
>>>> Hi,
>>>>
>>>> could you introduce some logging to figure out from which method call
>>>> the delay is introduced?
>>>>
>>>> Best,
>>>> Stefan
>>>>
>>>> Am 12.07.2017 um 11:37 schrieb Gyula Fóra <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node=14210=3>>:
>>>>
>>>> Hi,
>>>>
>>>> We are using the latest 1.3.1
>>>>
>>>> Gyula
>>>>
>>>> Urs Schoenenberger <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node=14210=4>> ezt írta
>>>> (időpont: 2017. júl. 12., Sze, 10:44):
>>>>
>>>>> Hi Gyula,
>>>>>
>>>>> I don't know the cause unfortunately, but we observed a similiar issue
>>>>> on Flink 1.1.3. The problem seems to be gone after upgrading to 1.2.1.
>>>>> Which version are you running on?
>>>>>
>>>>> Urs
>>>>>
>>>>> On 12.07.2017 09:48, Gyula Fóra wrote:
>>>>> > Hi,
>>>>> >
>>>>> > I have noticed a strange behavior in one of our jobs: every once in
>>>>> a while
>>>>> > the Kafka source checkpointing time becomes extremely large compared
>>>>> to
>>>>> > what it usually is. (To be very specific it is a kafka source
>>>>> chained with
>>>>> > a stateless map operator)
>>>>> >
>>>>> > To be more specific checkpointing the offsets usually takes around
>>>>> 10ms
>>>>> > which sounds reasonable but in some checkpoints this goes into the
>>>>> 3-5
>>>>> > minutes range practically blocking the job for that period of time.
>>>>> > Yesterday I have observed even 10 minute delays. First I thought
>>>>> that some
>>>>> > sources might trigger checkpoints later than others, but adding some
>>>>> > logging and comparing it it seems that the triggerCheckpoint was
>>>>> received
>>>>> > at the same time.
>>>>> >
>>>>> > Interestingly only one of the 3 kafka sources in the job seems to be
>>>>> > affected (last time I checked at least). We are still using the 0.8
>>>>> > consumer with commit on checkpoints. Also I dont see this happen in
>>>>> other
>>>>> > jobs.
>>>>> >
>>>>> > Any clue on what might cause this?
>>>>> >
>>>>> > Thanks :)
>>>>> > Gyula
>>>>> >
>>>>> >
>>>>> >
>>>>> > Hi,
>>>>> >
>>>>> > I have noticed a strange behavior in one of our jobs: every once in a
>>>>> > while the Kafka source checkpointing time becomes extremely large
>>>>> > compared to what it usually is. (To be very specific it is a kafka
>>>>> > source chained with a stateles

Re: Is there some metric info about RocksdbBackend?

2017-06-30 Thread vinay patil
Hi Gerry,

I had set the following parameters for DBOptions:
setStatsDempPeriodSec( 600) // 10 mins
setDBLogDir("/var/tmp/")

 I saw the files getting generated in /var/tmp. But I did not get the
statistics here. You will get all the options that are used for RocksDB.

May be you can try to set createStatistics() as well.

By the way I was able to get rid of memory consumption issue. Did you try
using FLASH_SSD_OPTION ?


Regards,
Vinay Patil

On Fri, Jun 30, 2017 at 2:49 PM, gerryzhou [via Apache Flink User Mailing
List archive.] <ml+s2336050n14081...@n4.nabble.com> wrote:

> Hi,
>  Is there some metric info about RocksdbBackend in flink, like sst
> compact times, memtable dump times, block cache size and so on. Currently
> when using Rocksdb as backend it behavior is black for us  and it
> consumption a lot of memory, i want to figure out it behavior via metric.
>
>
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Is-there-some-metric-info-about-RocksdbBackend-tp14081.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-some-metric-info-about-RocksdbBackend-tp14081p14083.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Checkpointing with RocksDB as statebackend

2017-06-29 Thread Vinay Patil
Hi Guys,

I am able to overcome the physical memory consumption issue by setting the
following options of RocksDB:

*DBOptions: *
 along with the FLASH_SSD_OPTIONS added the following:
 maxBackgroundCompactions(4)

*ColumnFamilyOptions:*
  max_buffer_size : 512 MB
  block_cache_size : 128 MB
  max_write_buffer_number : 5
  minimum_buffer_number_to_merge : 2
  cacheIndexAndFilterBlocks : true
  optimizeFilterForHits: true

I am going to modify these options to get the desired performance.

I have set the DbLogDir("/var/tmp/") and StatsSumpPeriodicSec but I only
got the configurations set in the log file present in var/tmp/

Where will I get the RocksDB statistics if I set createStatistics ?

Let me know if any other configurations will help to get better
performance. Now the physical memory is slowly getting increased and I can
see the drop in the graph ( this means flushing is taking place at regular
intervals )

Regards,
Vinay Patil

On Thu, Jun 29, 2017 at 9:13 PM, Vinay Patil <vinay18.pa...@gmail.com>
wrote:

> The state size is not that huge. On the Flink UI when it showed the data
> sent as 4GB , the physical memory usage was close to 90GB ..
>
> I will re-run by setting the Flushing options of RocksDB because I am
> facing this issue on 1.2.0 as well.
>
> Regards,
> Vinay Patil
>
> On Thu, Jun 29, 2017 at 9:03 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Yup, I got that. I’m just wondering whether this occurs only with enabled
>> checkpointing or also when checkpointing is disabled.
>>
>> On 29. Jun 2017, at 17:31, Vinay Patil <vinay18.pa...@gmail.com> wrote:
>>
>> Hi Aljoscha,
>>
>> Yes I have tried with 1.2.1 and 1.3.0 , facing the same issue.
>>
>> The issue is not of Heap memory , it is of the Off-Heap memory that is
>> getting used  ( please refer to the earlier snapshot I have attached in
>> which the graph keeps on growing ).
>>
>>
>> Regards,
>> Vinay Patil
>>
>> On Thu, Jun 29, 2017 at 8:55 PM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Just a quick remark: Flink 1.3.0 and 1.2.1 always use FRocksDB, you
>>> shouldn’t manually specify that.
>>>
>>> On 29. Jun 2017, at 17:20, Vinay Patil <vinay18.pa...@gmail.com> wrote:
>>>
>>> Hi Gerry,
>>>
>>> Even I have faced this issue on 1.3.0 even by using FRocksDB and
>>> enabling incremental checkpointing.
>>>
>>> You can add FRocksDB dependency as shown here :
>>> https://github.com/apache/flink/pull/3704
>>>
>>> We will have to set some RocksDB parameters  to get this working.
>>>
>>> @Stefan or @Stephan : can you please help in resolving this issue
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Thu, Jun 29, 2017 at 6:01 PM, gerryzhou [via Apache Flink User
>>> Mailing List archive.] <ml+s2336050n1406...@n4.nabble.com> wrote:
>>>
>>>> Hi, Vinay,
>>>>  I observed a similar problem in flink 1.3.0 with rocksdb. I wonder
>>>> how to use FRocksDB as you mentioned above. Thanks.
>>>>
>>>> --
>>>> If you reply to this email, your message will be added to the
>>>> discussion below:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>> ble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp1175
>>>> 2p14063.html
>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>> email ml+s2336050n1...@n4.nabble.com
>>>> To unsubscribe from Apache Flink User Mailing List archive., click here
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
>>>> .
>>>> NAML
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>
>>>
>>>
>>>
>>
>>
>


Re: Checkpointing with RocksDB as statebackend

2017-06-29 Thread Vinay Patil
The state size is not that huge. On the Flink UI when it showed the data
sent as 4GB , the physical memory usage was close to 90GB ..

I will re-run by setting the Flushing options of RocksDB because I am
facing this issue on 1.2.0 as well.

Regards,
Vinay Patil

On Thu, Jun 29, 2017 at 9:03 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Yup, I got that. I’m just wondering whether this occurs only with enabled
> checkpointing or also when checkpointing is disabled.
>
> On 29. Jun 2017, at 17:31, Vinay Patil <vinay18.pa...@gmail.com> wrote:
>
> Hi Aljoscha,
>
> Yes I have tried with 1.2.1 and 1.3.0 , facing the same issue.
>
> The issue is not of Heap memory , it is of the Off-Heap memory that is
> getting used  ( please refer to the earlier snapshot I have attached in
> which the graph keeps on growing ).
>
>
> Regards,
> Vinay Patil
>
> On Thu, Jun 29, 2017 at 8:55 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Just a quick remark: Flink 1.3.0 and 1.2.1 always use FRocksDB, you
>> shouldn’t manually specify that.
>>
>> On 29. Jun 2017, at 17:20, Vinay Patil <vinay18.pa...@gmail.com> wrote:
>>
>> Hi Gerry,
>>
>> Even I have faced this issue on 1.3.0 even by using FRocksDB and enabling
>> incremental checkpointing.
>>
>> You can add FRocksDB dependency as shown here :
>> https://github.com/apache/flink/pull/3704
>>
>> We will have to set some RocksDB parameters  to get this working.
>>
>> @Stefan or @Stephan : can you please help in resolving this issue
>>
>> Regards,
>> Vinay Patil
>>
>> On Thu, Jun 29, 2017 at 6:01 PM, gerryzhou [via Apache Flink User Mailing
>> List archive.] <ml+s2336050n1406...@n4.nabble.com> wrote:
>>
>>> Hi, Vinay,
>>>  I observed a similar problem in flink 1.3.0 with rocksdb. I wonder
>>> how to use FRocksDB as you mentioned above. Thanks.
>>>
>>> --
>>> If you reply to this email, your message will be added to the discussion
>>> below:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p14063.html
>>> To start a new topic under Apache Flink User Mailing List archive.,
>>> email ml+s2336050n1...@n4.nabble.com
>>> To unsubscribe from Apache Flink User Mailing List archive., click here
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
>>> .
>>> NAML
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>
>>
>>
>>
>
>


Re: Checkpointing with RocksDB as statebackend

2017-06-29 Thread Vinay Patil
Hi Aljoscha,

Yes I have tried with 1.2.1 and 1.3.0 , facing the same issue.

The issue is not of Heap memory , it is of the Off-Heap memory that is
getting used  ( please refer to the earlier snapshot I have attached in
which the graph keeps on growing ).


Regards,
Vinay Patil

On Thu, Jun 29, 2017 at 8:55 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Just a quick remark: Flink 1.3.0 and 1.2.1 always use FRocksDB, you
> shouldn’t manually specify that.
>
> On 29. Jun 2017, at 17:20, Vinay Patil <vinay18.pa...@gmail.com> wrote:
>
> Hi Gerry,
>
> Even I have faced this issue on 1.3.0 even by using FRocksDB and enabling
> incremental checkpointing.
>
> You can add FRocksDB dependency as shown here : https://github.com/apache/
> flink/pull/3704
>
> We will have to set some RocksDB parameters  to get this working.
>
> @Stefan or @Stephan : can you please help in resolving this issue
>
> Regards,
> Vinay Patil
>
> On Thu, Jun 29, 2017 at 6:01 PM, gerryzhou [via Apache Flink User Mailing
> List archive.] <ml+s2336050n1406...@n4.nabble.com> wrote:
>
>> Hi, Vinay,
>>  I observed a similar problem in flink 1.3.0 with rocksdb. I wonder
>> how to use FRocksDB as you mentioned above. Thanks.
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-
>> tp11752p14063.html
>> To start a new topic under Apache Flink User Mailing List archive., email
>> ml+s2336050n1...@n4.nabble.com
>> To unsubscribe from Apache Flink User Mailing List archive., click here
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
>> .
>> NAML
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
>


Re: Checkpointing with RocksDB as statebackend

2017-06-29 Thread Vinay Patil
Hi Gerry,

Even I have faced this issue on 1.3.0 even by using FRocksDB and enabling
incremental checkpointing.

You can add FRocksDB dependency as shown here :
https://github.com/apache/flink/pull/3704

We will have to set some RocksDB parameters  to get this working.

@Stefan or @Stephan : can you please help in resolving this issue

Regards,
Vinay Patil

On Thu, Jun 29, 2017 at 6:01 PM, gerryzhou [via Apache Flink User Mailing
List archive.] <ml+s2336050n1406...@n4.nabble.com> wrote:

> Hi, Vinay,
>  I observed a similar problem in flink 1.3.0 with rocksdb. I wonder
> how to use FRocksDB as you mentioned above. Thanks.
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-
> Checkpointing-with-RocksDB-as-statebackend-tp11752p14063.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>


Re: Checkpointing with RocksDB as statebackend

2017-06-29 Thread Vinay Patil
Hi Xiaogang,

Yes I have set that, I got the same issue. I don't see the graph coming
down. Also I checked the HDFS usage  , only 3GB is being used, that means
nothing is getting flushed to disk.

I think the parameters are not getting set properly. I am using FRocksDB ,
is it causing this error ?


Regards,
Vinay Patil

On Thu, Jun 29, 2017 at 7:30 AM, SHI Xiaogang <shixiaoga...@gmail.com>
wrote:

> Hi Vinay,
>
> We observed a similar problem before. We found that RocksDB keeps a lot of
> index and filter blocks in memory. With the growth in state size (in our
> cases, most states are only cleared in windowed streams), these blocks will
> occupy much more memory.
>
> We now let RocksDB put these blocks into block cache (via
> setCacheIndexAndFilterBlocks), and limit the memory usage of RocksDB with
> block cache size. Performance may be degraded, but TMs can avoid being
> killed by YARN for overused memory.
>
> This may not be the same cause of your problem, but it may be helpful.
>
> Regards,
> Xiaogang
>
>
>
>
>
>
> 2017-06-28 23:26 GMT+08:00 Vinay Patil <vinay18.pa...@gmail.com>:
>
>> Hi Aljoscha,
>>
>> I am using event Time based tumbling window wherein the allowedLateness
>> is kept to Long.MAX_VALUE and I have custom trigger which is similar to
>> 1.0.3 where Flink was not discarding late elements (we have discussed this
>> scenario before).
>>
>> The watermark is working correctly because I have validated the records
>> earlier.
>>
>> I was doubtful that the RocksDB statebackend is not set , but in the logs
>> I can clearly see that RocksDB is initialized successfully, so that should
>> not be an issue.
>>
>> Even I have not changed any major  code from the last performance test I
>> had done.
>>
>> The snapshot I had attached is of Off-heap memory, I have only assigned
>> 12GB heap memory per TM
>>
>>
>> Regards,
>> Vinay Patil
>>
>> On Wed, Jun 28, 2017 at 8:43 PM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> Just a quick question, because I’m not sure whether this came up in the
>>> discussion so far: what kind of windows are you using? Processing
>>> time/event time? Sliding Windows/Tumbling Windows? Allowed lateness? How is
>>> the watermark behaving?
>>>
>>> Also, the latest memory usage graph you sent, is that heap memory or
>>> off-heap memory or both?
>>>
>>> Best,
>>> Aljoscha
>>>
>>> > On 27. Jun 2017, at 11:45, vinay patil <vinay18.pa...@gmail.com>
>>> wrote:
>>> >
>>> > Hi Stephan,
>>> >
>>> > I am observing similar issue with Flink 1.2.1
>>> >
>>> > The memory is continuously increasing and data is not getting flushed
>>> to
>>> > disk.
>>> >
>>> > I have attached the snapshot for reference.
>>> >
>>> > Also the data processed till now is only 17GB and above 120GB memory is
>>> > getting used.
>>> >
>>> > Is there any change wrt RocksDB configurations
>>> >
>>> > <http://apache-flink-user-mailing-list-archive.2336050.n4.na
>>> bble.com/file/n14013/TM_Memory_Usage.png>
>>> >
>>> > Regards,
>>> > Vinay Patil
>>> >
>>> >
>>> >
>>> > --
>>> > View this message in context: http://apache-flink-user-maili
>>> ng-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-
>>> RocksDB-as-statebackend-tp11752p14013.html
>>> > Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
>>>
>>>
>>
>


Re: Checkpointing with RocksDB as statebackend

2017-06-28 Thread Vinay Patil
Hi Aljoscha,

I am using event Time based tumbling window wherein the allowedLateness is
kept to Long.MAX_VALUE and I have custom trigger which is similar to 1.0.3
where Flink was not discarding late elements (we have discussed this
scenario before).

The watermark is working correctly because I have validated the records
earlier.

I was doubtful that the RocksDB statebackend is not set , but in the logs I
can clearly see that RocksDB is initialized successfully, so that should
not be an issue.

Even I have not changed any major  code from the last performance test I
had done.

The snapshot I had attached is of Off-heap memory, I have only assigned
12GB heap memory per TM


Regards,
Vinay Patil

On Wed, Jun 28, 2017 at 8:43 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
>
> Just a quick question, because I’m not sure whether this came up in the
> discussion so far: what kind of windows are you using? Processing
> time/event time? Sliding Windows/Tumbling Windows? Allowed lateness? How is
> the watermark behaving?
>
> Also, the latest memory usage graph you sent, is that heap memory or
> off-heap memory or both?
>
> Best,
> Aljoscha
>
> > On 27. Jun 2017, at 11:45, vinay patil <vinay18.pa...@gmail.com> wrote:
> >
> > Hi Stephan,
> >
> > I am observing similar issue with Flink 1.2.1
> >
> > The memory is continuously increasing and data is not getting flushed to
> > disk.
> >
> > I have attached the snapshot for reference.
> >
> > Also the data processed till now is only 17GB and above 120GB memory is
> > getting used.
> >
> > Is there any change wrt RocksDB configurations
> >
> > <http://apache-flink-user-mailing-list-archive.2336050.n4.
> nabble.com/file/n14013/TM_Memory_Usage.png>
> >
> > Regards,
> > Vinay Patil
> >
> >
> >
> > --
> > View this message in context: http://apache-flink-user-maili
> ng-list-archive.2336050.n4.nabble.com/Re-Checkpointing-
> with-RocksDB-as-statebackend-tp11752p14013.html
> > Sent from the Apache Flink User Mailing List archive. mailing list
> archive at Nabble.com.
>
>


Re: Checkpointing with RocksDB as statebackend

2017-06-27 Thread vinay patil
Hi Stephan,

I am observing similar issue with Flink 1.2.1

The memory is continuously increasing and data is not getting flushed to
disk.

I have attached the snapshot for reference.

Also the data processed till now is only 17GB and above 120GB memory is
getting used.

Is there any change wrt RocksDB configurations

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14013/TM_Memory_Usage.png>
 

Regards,
Vinay Patil



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p14013.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Checkpointing with RocksDB as statebackend

2017-06-26 Thread Vinay Patil
Hi Stephan,

I have upgraded to Flink 1.3.0 to test RocksDB  with incremental
checkpointing (PredefinedOptions used is FLASH_SSD_OPTIMIZED)

I am currently creating a YARN session and running the job on EMR having
r3.4xlarge instances (122GB of memory), I have observed that it is
utilizing almost all memory. This was not happening with previous version ;
maximum 30GB was getting utilized.

Because of this issue the job manager was killed and the job failed.

Is there any other configurations I have to do ?

P.S I am currently using FRocksDB


Regards,
Vinay Patil

On Fri, May 5, 2017 at 1:01 PM, Vinay Patil <vinay18.pa...@gmail.com> wrote:

> Hi Stephan,
>
> I tested the pipeline with the FRocksDB dependency  (with SSD_OPTIMIZED
> option), none of the checkpoints were failed.
>
> For checkpointing 10GB of state it took 45secs which is better than the
> previous results.
>
> Let me know if there are any other configurations which will help to get
> better results.
>
> Regards,
> Vinay Patil
>
> On Thu, May 4, 2017 at 10:05 PM, Vinay Patil <vinay18.pa...@gmail.com>
> wrote:
>
>> Hi Stephan,
>>
>> I see that the RocksDb issue is solved by having a separate FRocksDB
>> dependency.
>>
>> I have added this dependency as discussed on the JIRA. Is it the only
>> thing that we have to do or we have to change the code  for setting RocksDB
>> state backend as well ?
>>
>>
>>
>> Regards,
>> Vinay Patil
>>
>> On Tue, Mar 28, 2017 at 1:20 PM, Stefan Richter [via Apache Flink User
>> Mailing List archive.] <ml-node+s2336050n12429...@n4.nabble.com> wrote:
>>
>>> Hi,
>>>
>>> I was able to come up with a custom build of RocksDB yesterday that
>>> seems to fix the problems. I still have to build the native code for
>>> different platforms and then test it. I cannot make promises about the
>>> 1.2.1 release, but I would be optimistic that this will make it in.
>>>
>>> Best,
>>> Stefan
>>>
>>> Am 27.03.2017 um 19:12 schrieb vinay patil <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node=12429=0>>:
>>>
>>> Hi Stephan,
>>>
>>> Just an update, last week I did a run with state size close to 18GB, I
>>> did not observe the pipeline getting stopped in between with G1GC enabled.
>>>
>>> I had observed checkpoint failures when the state size was close to 38GB
>>> (but in this case G1GC was not enabled)
>>>
>>> Is it possible to get the RocksDB fix in 1.2.1 so that I can test it out.
>>>
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Sat, Mar 18, 2017 at 12:25 AM, Stephan Ewen [via Apache Flink User
>>> Mailing List archive.] <>> link="external" class="">[hidden email]> wrote:
>>>
>>>> @vinay Let's see how fast we get this fix in - I hope yes. It may
>>>> depend also a bit on the RocksDB community.
>>>>
>>>> In any case, if it does not make it in, we can do a 1.2.2 release
>>>> immediately after (I think the problem is big enough to warrant that), or
>>>> at least release a custom version of the RocksDB state backend that
>>>> includes the fix.
>>>>
>>>> Stephan
>>>>
>>>>
>>>> On Fri, Mar 17, 2017 at 5:51 PM, vinay patil <[hidden email]
>>>> <http://user/SendEmail.jtp?type=node=12276=0>> wrote:
>>>>
>>>>> Hi Stephan,
>>>>>
>>>>> Is the performance related change  of RocksDB going to be part of
>>>>> Flink 1.2.1 ?
>>>>>
>>>>> Regards,
>>>>> Vinay Patil
>>>>>
>>>>> On Thu, Mar 16, 2017 at 6:13 PM, Stephan Ewen [via Apache Flink User
>>>>> Mailing List archive.] <[hidden email]
>>>>> <http://user/SendEmail.jtp?type=node=12274=0>> wrote:
>>>>>
>>>>>> The only immediate workaround is to use windows with "reduce" or
>>>>>> "fold" or "aggregate" and not "apply". And to not use an evictor.
>>>>>>
>>>>>> The good news is that I think we have a good way of fixing this soon,
>>>>>> making an adjustment in RocksDB.
>>>>>>
>>>>>> For the Yarn / g1gc question: Not 100% sure about that - you can
>>>>>> check if it used g1gc. If not, you may be able to pass this through the
>>>>>> "env.java.opt

Re: In-transit Data Encryption in EMR

2017-06-09 Thread vinay patil
Hi Guys,

Can anyone please provide me solution to my queries.

On Jun 8, 2017 11:30 PM, "Vinay Patil" <vinay18.pa...@gmail.com> wrote:

> Hi Guys,
>
> I am able to setup SSL correctly, however the following command  does not
> work correctly and results in the error I had mailed earlier
>
> flink run -m yarn-cluster -yt deploy-keys/ TestJob.jar
>
>
> Few Doubts:
> 1. Can anyone please explain me how do you test if SSL is working
> correctly ? Currently I am just relying on the logs.
>
> 2. Wild Card is not working with the keytool command, can you please let
> me know what is the issue with the following command:
> keytool -genkeypair -alias ca -keystore: -ext SAN=dns:node1.*
>
>
> Regards,
> Vinay Patil
>
> On Mon, Jun 5, 2017 at 8:43 PM, vinay patil [via Apache Flink User Mailing
> List archive.] <ml+s2336050n13490...@n4.nabble.com> wrote:
>
>> Hi Gordon,
>>
>> The yarn session gets created when I try to run the following command:
>> yarn-session.sh -n 4 -s 2 -jm 1024 -tm 3000 -d --ship deploy-keys/
>>
>> However when I try to access the Job Manager UI, it gives me exception as
>> :
>> javax.net.ssl.SSLHandshakeException: 
>> sun.security.validator.ValidatorException:
>> PKIX path building failed: 
>> sun.security.provider.certpath.SunCertPathBuilderException:
>> unable to find valid certification path to requested target
>>
>> I am able to see the Job Manager UI  when I imported the CA certificate
>> to java truststore on EMR master node :
>> keytool -keystore /etc/alternatives/jre/lib/security/cacerts -importcert
>> -alias FLINKSSL -file ca.cer
>>
>>
>> Does this mean that SSL is configured correctly ? I can see in the Job
>> Manager configurations and also in th e logs. Is there any other way to
>> verify ?
>>
>> Also the keystore and truststore  password should be masked in the logs
>> which is not case.
>>
>>
>>
>>
>>
>>
>> *2017-06-05 14:51:31,135 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: security.ssl.enabled, true 2017-06-05 14:51:31,136
>> INFO  org.apache.flink.configuration.GlobalConfiguration-
>> Loading configuration property: security.ssl.keystore,
>> deploy-keys/ca.keystore 2017-06-05 14:51:31,136 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: security.ssl.keystore-password, password 2017-06-05
>> 14:51:31,136 INFO  org.apache.flink.configuration.GlobalConfiguration
>>  - Loading configuration property: security.ssl.key-password, password
>> 2017-06-05 14:51:31,136 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: security.ssl.truststore, deploy-keys/ca.truststore
>> 2017-06-05 14:51:31,136 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: security.ssl.truststore-password, password*
>>
>>
>> Regards,
>> Vinay Patil
>>
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/In-transit-Data-Encryption-in-EMR-tp13455p13490.html
>> To start a new topic under Apache Flink User Mailing List archive., email
>> ml+s2336050n1...@n4.nabble.com
>> To unsubscribe from Apache Flink User Mailing List archive., click here
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
>> .
>> NAML
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/In-transit-Data-Encryption-in-EMR-tp13455p13609.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: In-transit Data Encryption in EMR

2017-06-08 Thread vinay patil
Hi Guys,

I am able to setup SSL correctly, however the following command  does not
work correctly and results in the error I had mailed earlier

flink run -m yarn-cluster -yt deploy-keys/ TestJob.jar


Few Doubts:
1. Can anyone please explain me how do you test if SSL is working correctly
? Currently I am just relying on the logs.

2. Wild Card is not working with the keytool command, can you please let me
know what is the issue with the following command:
keytool -genkeypair -alias ca -keystore: -ext SAN=dns:node1.*


Regards,
Vinay Patil

On Mon, Jun 5, 2017 at 8:43 PM, vinay patil [via Apache Flink User Mailing
List archive.] <ml+s2336050n13490...@n4.nabble.com> wrote:

> Hi Gordon,
>
> The yarn session gets created when I try to run the following command:
> yarn-session.sh -n 4 -s 2 -jm 1024 -tm 3000 -d --ship deploy-keys/
>
> However when I try to access the Job Manager UI, it gives me exception as
> :
> javax.net.ssl.SSLHandshakeException: 
> sun.security.validator.ValidatorException:
> PKIX path building failed: 
> sun.security.provider.certpath.SunCertPathBuilderException:
> unable to find valid certification path to requested target
>
> I am able to see the Job Manager UI  when I imported the CA certificate to
> java truststore on EMR master node :
> keytool -keystore /etc/alternatives/jre/lib/security/cacerts -importcert
> -alias FLINKSSL -file ca.cer
>
>
> Does this mean that SSL is configured correctly ? I can see in the Job
> Manager configurations and also in th e logs. Is there any other way to
> verify ?
>
> Also the keystore and truststore  password should be masked in the logs
> which is not case.
>
>
>
>
>
>
> *2017-06-05 14:51:31,135 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: security.ssl.enabled, true 2017-06-05 14:51:31,136
> INFO  org.apache.flink.configuration.GlobalConfiguration-
> Loading configuration property: security.ssl.keystore,
> deploy-keys/ca.keystore 2017-06-05 14:51:31,136 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: security.ssl.keystore-password, password 2017-06-05
> 14:51:31,136 INFO  org.apache.flink.configuration.GlobalConfiguration
>  - Loading configuration property: security.ssl.key-password, password
> 2017-06-05 14:51:31,136 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: security.ssl.truststore, deploy-keys/ca.truststore
> 2017-06-05 14:51:31,136 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: security.ssl.truststore-password, password*
>
>
> Regards,
> Vinay Patil
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/In-transit-Data-Encryption-in-EMR-tp13455p13490.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/In-transit-Data-Encryption-in-EMR-tp13455p13598.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: In-transit Data Encryption in EMR

2017-06-05 Thread vinay patil
Hi Gordon,

The yarn session gets created when I try to run the following command:
yarn-session.sh -n 4 -s 2 -jm 1024 -tm 3000 -d --ship deploy-keys/

However when I try to access the Job Manager UI, it gives me exception as :
javax.net.ssl.SSLHandshakeException:
sun.security.validator.ValidatorException: PKIX path building failed:
sun.security.provider.certpath.SunCertPathBuilderException: unable to find
valid certification path to requested target

I am able to see the Job Manager UI  when I imported the CA certificate to
java truststore on EMR master node :
keytool -keystore /etc/alternatives/jre/lib/security/cacerts -importcert
-alias FLINKSSL -file ca.cer


Does this mean that SSL is configured correctly ? I can see in the Job
Manager configurations and also in th e logs. Is there any other way to
verify ?

Also the keystore and truststore  password should be masked in the logs
which is not case.

/*2017-06-05 14:51:31,135 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: security.ssl.enabled, true
2017-06-05 14:51:31,136 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: security.ssl.keystore, deploy-keys/ca.keystore
2017-06-05 14:51:31,136 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: security.ssl.keystore-password, password
2017-06-05 14:51:31,136 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: security.ssl.key-password, password
2017-06-05 14:51:31,136 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: security.ssl.truststore, deploy-keys/ca.truststore
2017-06-05 14:51:31,136 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: security.ssl.truststore-password, password*/


Regards,
Vinay Patil




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/In-transit-Data-Encryption-in-EMR-tp13455p13490.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: In-transit Data Encryption in EMR

2017-06-05 Thread vinay patil
Hi Gordan,

Thank you for your response.

I have done the necessary configurations by adding all the node ip's from
Resource Manager , is this correct ?
Also I will try to check if wildcard works as all our hostname begins with a
same pattern.
For ex : SAN=dns:ip-192-168.* should work , right ?


Facing a weird issue when I try to submit the job using the following
command:
flink run -m yarn-cluster -yn 4 -ys 4 -yjm 1024 -ytm 4000 -yt deploy-keys/
testFlinkSSL.jar --configFileName conf.yaml

Error is : java.lang.IllegalArgumentException: Wrong FS:
hdfs://:8020/user/hadoop/.flink/application_1496660166576_0001/flink-dist_2.10-1.2.0.jar,
expected: file:///

I see a JIRA ticket regarding the same but did not find any solution to
this.

Regards,
Vinay Patil





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/In-transit-Data-Encryption-in-EMR-tp13455p13489.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: In-transit Data Encryption in EMR

2017-06-05 Thread vinay patil
Thank you Till.

Gordon can you please help.

Regards,
Vinay Patil

On Fri, Jun 2, 2017 at 9:10 PM, Till Rohrmann [via Apache Flink User
Mailing List archive.] <ml+s2336050n13459...@n4.nabble.com> wrote:

> Hi Vinay,
>
> I've pulled my colleague Gordon into the conversation who can probably
> tell you more about Flink's security features.
>
> Cheers,
> Till
>
> On Fri, Jun 2, 2017 at 2:22 PM, vinay patil <[hidden email]
> <http:///user/SendEmail.jtp?type=node=13459=0>> wrote:
>
>> Hi,
>>
>> Currently I am looking into configuring in-transit data encryption either
>> using Flink SSL Setup or directly using EMR.
>>
>> Few Doubts:
>>1. Will the existing functionality provided by Amazon to configure
>> in-transit data encrytion work for Flink as well. This is explained here:
>> http://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-encry
>> ption-enable-security-configuration.html
>> http://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-data-
>> encryption-options.html#emr-encryption-intransit
>>
>>2. Using Flink SSL Setup: as we know only the IP address of master node
>> on EMR , should we pass only its ip address in the SAN list as given here
>> ?
>> (I think it should work as the yarn-cli command will distribute the
>> truststore and keystore to each TM )
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> setup/security-ssl.html#use-yarn-cli-to-deploy-the-
>> keystores-and-truststore
>>
>> Regards,
>> Vinay Patil
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/In-transit-Data-Enc
>> ryption-in-EMR-tp13455.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/In-transit-Data-Encryption-in-EMR-tp13455p13459.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/In-transit-Data-Encryption-in-EMR-tp13455p13486.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

In-transit Data Encryption in EMR

2017-06-02 Thread vinay patil
Hi,

Currently I am looking into configuring in-transit data encryption either
using Flink SSL Setup or directly using EMR.

Few Doubts:
   1. Will the existing functionality provided by Amazon to configure
in-transit data encrytion work for Flink as well. This is explained here:
http://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-encryption-enable-security-configuration.html
http://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-data-encryption-options.html#emr-encryption-intransit

   2. Using Flink SSL Setup: as we know only the IP address of master node
on EMR , should we pass only its ip address in the SAN list as given here ?
(I think it should work as the yarn-cli command will distribute the
truststore and keystore to each TM )
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/security-ssl.html#use-yarn-cli-to-deploy-the-keystores-and-truststore

Regards,
Vinay Patil 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/In-transit-Data-Encryption-in-EMR-tp13455.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Queries regarding Historical Reprocessing

2017-05-03 Thread Vinay Patil
Hi Guys,

Can someone please help me in understanding this ?

Regards,
Vinay Patil

On Thu, Apr 27, 2017 at 12:36 PM, Vinay Patil <vinay18.pa...@gmail.com>
wrote:

> Hi Guys,
>
> For historical reprocessing , I am reading the avro data from S3 and
> passing these records to the same pipeline for processing.
>
> I have the following queries:
>
> 1. I am running this pipeline as a stream application with checkpointing
> enabled, the records are successfully written to S3, however they remain in
> the pending state as checkpointing is not triggered when I doing
> re-processing. Why does this happen ? (kept the checkpointing interval to 1
> minute, pipeline ran for 10 minutes)
> this is the code I am using for reading avro data from S3
>
>
>
>
>
> *AvroInputFormat avroInputFormat = new AvroInputFormat<>(
>   new org.apache.flink.core.fs.Path(s3Path),
> SomeAvroClass.class); sourceStream =
> env.createInput(avroInputFormat).map(...); *
>
> 2. For the source stream Flink sets the parallelism as 1 , and for the
> rest of the operators the user specified parallelism is set. How does Flink
> reads the data ? does it bring the entire file from S3 one at a time  and
> then Split it according to parallelism ?
>
> 3. I am reading from two different S3 folders and treating them as
> separate sourceStreams, how does Flink reads data in this case ? does it
> pick one file from each S3 folder , split the data and pass it downstream ?
> Does Flink reads the data sequentially ? I am confused here as only one
> Task Manager is reading the data from S3 and then all TM's are getting the
> data.
>
> 4. Although I am running this as as stream application, the operators goes
> into FINISHED state after processing , is this because Flink treats the S3
> source as finite data ? What will happen if the data is continuously
> written to S3 from one pipeline and from the second pipeline I am doing
> historical re-processing ?
>
> Regards,
> Vinay Patil
>


Queries regarding Historical Reprocessing

2017-04-27 Thread Vinay Patil
Hi Guys,

For historical reprocessing , I am reading the avro data from S3 and
passing these records to the same pipeline for processing.

I have the following queries:

1. I am running this pipeline as a stream application with checkpointing
enabled, the records are successfully written to S3, however they remain in
the pending state as checkpointing is not triggered when I doing
re-processing. Why does this happen ? (kept the checkpointing interval to 1
minute, pipeline ran for 10 minutes)
this is the code I am using for reading avro data from S3





*AvroInputFormat avroInputFormat = new AvroInputFormat<>(
  new org.apache.flink.core.fs.Path(s3Path),
SomeAvroClass.class); sourceStream =
env.createInput(avroInputFormat).map(...); *

2. For the source stream Flink sets the parallelism as 1 , and for the rest
of the operators the user specified parallelism is set. How does Flink
reads the data ? does it bring the entire file from S3 one at a time  and
then Split it according to parallelism ?

3. I am reading from two different S3 folders and treating them as separate
sourceStreams, how does Flink reads data in this case ? does it pick one
file from each S3 folder , split the data and pass it downstream ? Does
Flink reads the data sequentially ? I am confused here as only one Task
Manager is reading the data from S3 and then all TM's are getting the data.

4. Although I am running this as as stream application, the operators goes
into FINISHED state after processing , is this because Flink treats the S3
source as finite data ? What will happen if the data is continuously
written to S3 from one pipeline and from the second pipeline I am doing
historical re-processing ?

Regards,
Vinay Patil


Queries regarding Historical Reprocessing

2017-04-26 Thread vinay patil
Hi Guys,

For historical reprocessing , I am reading the avro data from S3 and passing
these records to the same pipeline for processing. 

I have the following queries: 

1. I am running this pipeline as a stream application with checkpointing
enabled, the records are successfully written to S3, however they remain in
the pending state as checkpointing is not triggered when I doing
re-processing. Why does this happen ? (kept the checkpointing interval to 1
minute, pipeline ran for 10 minutes)
this is the code I am using for reading avro data from S3

/AvroInputFormat avroInputFormat = new AvroInputFormat<>(
new org.apache.flink.core.fs.Path(s3Path),
SomeAvroClass.class);

sourceStream = env.createInput(avroInputFormat).map(...);
/

2. For the source stream Flink sets the parallelism as 1 , and for the rest
of the operators the user specified parallelism is set. How does Flink reads
the data ? does it bring the entire file from S3 one at a time  and then
Split it according to parallelism ?

3. I am reading from two different S3 folders and treating them as separate
sourceStreams, how does Flink reads data in this case ? does it pick one
file from each S3 folder , split the data and pass it downstream ? Does
Flink reads the data sequentially ? I am confused here as only one Task
Manager is reading the data from S3 and then all TM's are getting the data.

4. Although I am running this as as stream application, the operators goes
into FINISHED state after processing , is this because Flink treats the S3
source as finite data ? What will happen if the data is continuously written
to S3 from one pipeline and from the second pipeline I am doing historical
re-processing ?

Regards,
Vinay Patil



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Queries-regarding-Historical-Reprocessing-tp12833.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Checkpointing with RocksDB as statebackend

2017-03-27 Thread vinay patil
Hi Stephan,

Just an update, last week I did a run with state size close to 18GB, I did
not observe the pipeline getting stopped in between with G1GC enabled.

I had observed checkpoint failures when the state size was close to 38GB
(but in this case G1GC was not enabled)

Is it possible to get the RocksDB fix in 1.2.1 so that I can test it out.


Regards,
Vinay Patil

On Sat, Mar 18, 2017 at 12:25 AM, Stephan Ewen [via Apache Flink User
Mailing List archive.] <ml-node+s2336050n1227...@n4.nabble.com> wrote:

> @vinay Let's see how fast we get this fix in - I hope yes. It may depend
> also a bit on the RocksDB community.
>
> In any case, if it does not make it in, we can do a 1.2.2 release
> immediately after (I think the problem is big enough to warrant that), or
> at least release a custom version of the RocksDB state backend that
> includes the fix.
>
> Stephan
>
>
> On Fri, Mar 17, 2017 at 5:51 PM, vinay patil <[hidden email]
> <http:///user/SendEmail.jtp?type=node=12276=0>> wrote:
>
>> Hi Stephan,
>>
>> Is the performance related change  of RocksDB going to be part of Flink
>> 1.2.1 ?
>>
>> Regards,
>> Vinay Patil
>>
>> On Thu, Mar 16, 2017 at 6:13 PM, Stephan Ewen [via Apache Flink User
>> Mailing List archive.] <[hidden email]
>> <http:///user/SendEmail.jtp?type=node=12274=0>> wrote:
>>
>>> The only immediate workaround is to use windows with "reduce" or "fold"
>>> or "aggregate" and not "apply". And to not use an evictor.
>>>
>>> The good news is that I think we have a good way of fixing this soon,
>>> making an adjustment in RocksDB.
>>>
>>> For the Yarn / g1gc question: Not 100% sure about that - you can check
>>> if it used g1gc. If not, you may be able to pass this through the
>>> "env.java.opts" parameter. (cc robert for confirmation)
>>>
>>> Stephan
>>>
>>>
>>>
>>> On Thu, Mar 16, 2017 at 8:31 AM, vinay patil <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node=12243=0>> wrote:
>>>
>>>> Hi Stephan,
>>>>
>>>> What can be the workaround for this ?
>>>>
>>>> Also need one confirmation : Is G1 GC used by default when running the
>>>> pipeline on YARN. (I see a thread of 2015 where G1 is used by default for
>>>> JAVA8)
>>>>
>>>>
>>>>
>>>> Regards,
>>>> Vinay Patil
>>>>
>>>> On Wed, Mar 15, 2017 at 10:32 PM, Stephan Ewen [via Apache Flink User
>>>> Mailing List archive.] <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node=12234=0>> wrote:
>>>>
>>>>> Hi Vinay!
>>>>>
>>>>> Savepoints also call the same problematic RocksDB function,
>>>>> unfortunately.
>>>>>
>>>>> We will have a fix next month. We either (1) get a patched RocksDB
>>>>> version or we (2) implement a different pattern for ListState in Flink.
>>>>>
>>>>> (1) would be the better solution, so we are waiting for a response
>>>>> from the RocksDB folks. (2) is always possible if we cannot get a fix from
>>>>> RocksDB.
>>>>>
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Wed, Mar 15, 2017 at 5:53 PM, vinay patil <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node=12225=0>> wrote:
>>>>>
>>>>>> Hi Stephan,
>>>>>>
>>>>>> Thank you for making me aware of this.
>>>>>>
>>>>>> Yes I am using a window without reduce function (Apply function). The
>>>>>> discussion happening on JIRA is exactly what I am observing, consistent
>>>>>> failure of checkpoints after some time and the stream halts.
>>>>>>
>>>>>> We want to go live in next month, not sure how this will affect in
>>>>>> production as we are going to get above 200 million data.
>>>>>>
>>>>>> As a workaround can I take the savepoint while the pipeline is
>>>>>> running ? Let's say if I take savepoint after every 30minutes, will it 
>>>>>> work
>>>>>> ?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Vinay Patil
>>>>>>
>>>>>> On Tue, Mar 14, 2017 at 10:02 PM, Ste

Re: Checkpointing with RocksDB as statebackend

2017-03-17 Thread vinay patil
Hi Stephan,

Is the performance related change  of RocksDB going to be part of Flink
1.2.1 ?

Regards,
Vinay Patil

On Thu, Mar 16, 2017 at 6:13 PM, Stephan Ewen [via Apache Flink User
Mailing List archive.] <ml-node+s2336050n12243...@n4.nabble.com> wrote:

> The only immediate workaround is to use windows with "reduce" or "fold" or
> "aggregate" and not "apply". And to not use an evictor.
>
> The good news is that I think we have a good way of fixing this soon,
> making an adjustment in RocksDB.
>
> For the Yarn / g1gc question: Not 100% sure about that - you can check if
> it used g1gc. If not, you may be able to pass this through the
> "env.java.opts" parameter. (cc robert for confirmation)
>
> Stephan
>
>
>
> On Thu, Mar 16, 2017 at 8:31 AM, vinay patil <[hidden email]
> <http:///user/SendEmail.jtp?type=node=12243=0>> wrote:
>
>> Hi Stephan,
>>
>> What can be the workaround for this ?
>>
>> Also need one confirmation : Is G1 GC used by default when running the
>> pipeline on YARN. (I see a thread of 2015 where G1 is used by default for
>> JAVA8)
>>
>>
>>
>> Regards,
>> Vinay Patil
>>
>> On Wed, Mar 15, 2017 at 10:32 PM, Stephan Ewen [via Apache Flink User
>> Mailing List archive.] <[hidden email]
>> <http:///user/SendEmail.jtp?type=node=12234=0>> wrote:
>>
>>> Hi Vinay!
>>>
>>> Savepoints also call the same problematic RocksDB function,
>>> unfortunately.
>>>
>>> We will have a fix next month. We either (1) get a patched RocksDB
>>> version or we (2) implement a different pattern for ListState in Flink.
>>>
>>> (1) would be the better solution, so we are waiting for a response from
>>> the RocksDB folks. (2) is always possible if we cannot get a fix from
>>> RocksDB.
>>>
>>> Stephan
>>>
>>>
>>> On Wed, Mar 15, 2017 at 5:53 PM, vinay patil <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node=12225=0>> wrote:
>>>
>>>> Hi Stephan,
>>>>
>>>> Thank you for making me aware of this.
>>>>
>>>> Yes I am using a window without reduce function (Apply function). The
>>>> discussion happening on JIRA is exactly what I am observing, consistent
>>>> failure of checkpoints after some time and the stream halts.
>>>>
>>>> We want to go live in next month, not sure how this will affect in
>>>> production as we are going to get above 200 million data.
>>>>
>>>> As a workaround can I take the savepoint while the pipeline is running
>>>> ? Let's say if I take savepoint after every 30minutes, will it work ?
>>>>
>>>>
>>>>
>>>> Regards,
>>>> Vinay Patil
>>>>
>>>> On Tue, Mar 14, 2017 at 10:02 PM, Stephan Ewen [via Apache Flink User
>>>> Mailing List archive.] <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node=12224=0>> wrote:
>>>>
>>>>> The issue in Flink is https://issues.apache.org/jira/browse/FLINK-5756
>>>>>
>>>>> On Tue, Mar 14, 2017 at 3:40 PM, Stefan Richter <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node=12209=0>> wrote:
>>>>>
>>>>>> Hi Vinay,
>>>>>>
>>>>>> I think the issue is tracked here: https://github.com/faceb
>>>>>> ook/rocksdb/issues/1988.
>>>>>>
>>>>>> Best,
>>>>>> Stefan
>>>>>>
>>>>>> Am 14.03.2017 um 15:31 schrieb Vishnu Viswanath <[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node=12209=1>>:
>>>>>>
>>>>>> Hi Stephan,
>>>>>>
>>>>>> Is there a ticket number/link to track this, My job has all the
>>>>>> conditions you mentioned.
>>>>>>
>>>>>> Thanks,
>>>>>> Vishnu
>>>>>>
>>>>>> On Tue, Mar 14, 2017 at 7:13 AM, Stephan Ewen <[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node=12209=2>> wrote:
>>>>>>
>>>>>>> Hi Vinay!
>>>>>>>
>>>>>>> We just discovered a bug in RocksDB. The bug affects windows without
>>>>>>> reduce() or fold(), windows with evictors, and ListState.
>>>>>>>
>>>

Re: Checkpointing with RocksDB as statebackend

2017-03-16 Thread vinay patil
@ Stephan,

I am not using explicit Evictor in my code. I will try using the Fold
function if it does not break my existing functionality :)

@Robert : Thank you for your answer, yes I have already tried to set G1GC
 this morning using env.java.opts, it works.
Which is the recommended GC for Streaming application (running on YARN -
EMR ) ?

Regards,
Vinay Patil

On Thu, Mar 16, 2017 at 6:36 PM, rmetzger0 [via Apache Flink User Mailing
List archive.] <ml-node+s2336050n12244...@n4.nabble.com> wrote:

> Yes, you can change the GC using the env.java.opts parameter.
> We are not setting any GC on YARN.
>
> On Thu, Mar 16, 2017 at 1:50 PM, Stephan Ewen <[hidden email]
> <http:///user/SendEmail.jtp?type=node=12244=0>> wrote:
>
>> The only immediate workaround is to use windows with "reduce" or "fold"
>> or "aggregate" and not "apply". And to not use an evictor.
>>
>> The good news is that I think we have a good way of fixing this soon,
>> making an adjustment in RocksDB.
>>
>> For the Yarn / g1gc question: Not 100% sure about that - you can check if
>> it used g1gc. If not, you may be able to pass this through the
>> "env.java.opts" parameter. (cc robert for confirmation)
>>
>> Stephan
>>
>>
>>
>> On Thu, Mar 16, 2017 at 8:31 AM, vinay patil <[hidden email]
>> <http:///user/SendEmail.jtp?type=node=12244=1>> wrote:
>>
>>> Hi Stephan,
>>>
>>> What can be the workaround for this ?
>>>
>>> Also need one confirmation : Is G1 GC used by default when running the
>>> pipeline on YARN. (I see a thread of 2015 where G1 is used by default for
>>> JAVA8)
>>>
>>>
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Wed, Mar 15, 2017 at 10:32 PM, Stephan Ewen [via Apache Flink User
>>> Mailing List archive.] <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node=12234=0>> wrote:
>>>
>>>> Hi Vinay!
>>>>
>>>> Savepoints also call the same problematic RocksDB function,
>>>> unfortunately.
>>>>
>>>> We will have a fix next month. We either (1) get a patched RocksDB
>>>> version or we (2) implement a different pattern for ListState in Flink.
>>>>
>>>> (1) would be the better solution, so we are waiting for a response from
>>>> the RocksDB folks. (2) is always possible if we cannot get a fix from
>>>> RocksDB.
>>>>
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Mar 15, 2017 at 5:53 PM, vinay patil <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node=12225=0>> wrote:
>>>>
>>>>> Hi Stephan,
>>>>>
>>>>> Thank you for making me aware of this.
>>>>>
>>>>> Yes I am using a window without reduce function (Apply function). The
>>>>> discussion happening on JIRA is exactly what I am observing, consistent
>>>>> failure of checkpoints after some time and the stream halts.
>>>>>
>>>>> We want to go live in next month, not sure how this will affect in
>>>>> production as we are going to get above 200 million data.
>>>>>
>>>>> As a workaround can I take the savepoint while the pipeline is running
>>>>> ? Let's say if I take savepoint after every 30minutes, will it work ?
>>>>>
>>>>>
>>>>>
>>>>> Regards,
>>>>> Vinay Patil
>>>>>
>>>>> On Tue, Mar 14, 2017 at 10:02 PM, Stephan Ewen [via Apache Flink User
>>>>> Mailing List archive.] <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node=12224=0>> wrote:
>>>>>
>>>>>> The issue in Flink is https://issues.apache.org/j
>>>>>> ira/browse/FLINK-5756
>>>>>>
>>>>>> On Tue, Mar 14, 2017 at 3:40 PM, Stefan Richter <[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node=12209=0>> wrote:
>>>>>>
>>>>>>> Hi Vinay,
>>>>>>>
>>>>>>> I think the issue is tracked here: https://github.com/faceb
>>>>>>> ook/rocksdb/issues/1988.
>>>>>>>
>>>>>>> Best,
>>>>>>> Stefan
>>>>>>>
>>>>>>> Am 14.03.2017 um 15:31 schrieb Vishnu Viswanath <[hidden email]
>>>>&

Re: Checkpointing with RocksDB as statebackend

2017-03-16 Thread vinay patil
Hi Stephan,

What can be the workaround for this ?

Also need one confirmation : Is G1 GC used by default when running the
pipeline on YARN. (I see a thread of 2015 where G1 is used by default for
JAVA8)



Regards,
Vinay Patil

On Wed, Mar 15, 2017 at 10:32 PM, Stephan Ewen [via Apache Flink User
Mailing List archive.] <ml-node+s2336050n12225...@n4.nabble.com> wrote:

> Hi Vinay!
>
> Savepoints also call the same problematic RocksDB function, unfortunately.
>
> We will have a fix next month. We either (1) get a patched RocksDB version
> or we (2) implement a different pattern for ListState in Flink.
>
> (1) would be the better solution, so we are waiting for a response from
> the RocksDB folks. (2) is always possible if we cannot get a fix from
> RocksDB.
>
> Stephan
>
>
> On Wed, Mar 15, 2017 at 5:53 PM, vinay patil <[hidden email]
> <http:///user/SendEmail.jtp?type=node=12225=0>> wrote:
>
>> Hi Stephan,
>>
>> Thank you for making me aware of this.
>>
>> Yes I am using a window without reduce function (Apply function). The
>> discussion happening on JIRA is exactly what I am observing, consistent
>> failure of checkpoints after some time and the stream halts.
>>
>> We want to go live in next month, not sure how this will affect in
>> production as we are going to get above 200 million data.
>>
>> As a workaround can I take the savepoint while the pipeline is running ?
>> Let's say if I take savepoint after every 30minutes, will it work ?
>>
>>
>>
>> Regards,
>> Vinay Patil
>>
>> On Tue, Mar 14, 2017 at 10:02 PM, Stephan Ewen [via Apache Flink User
>> Mailing List archive.] <[hidden email]
>> <http:///user/SendEmail.jtp?type=node=12224=0>> wrote:
>>
>>> The issue in Flink is https://issues.apache.org/jira/browse/FLINK-5756
>>>
>>> On Tue, Mar 14, 2017 at 3:40 PM, Stefan Richter <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node=12209=0>> wrote:
>>>
>>>> Hi Vinay,
>>>>
>>>> I think the issue is tracked here: https://github.com/faceb
>>>> ook/rocksdb/issues/1988.
>>>>
>>>> Best,
>>>> Stefan
>>>>
>>>> Am 14.03.2017 um 15:31 schrieb Vishnu Viswanath <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node=12209=1>>:
>>>>
>>>> Hi Stephan,
>>>>
>>>> Is there a ticket number/link to track this, My job has all the
>>>> conditions you mentioned.
>>>>
>>>> Thanks,
>>>> Vishnu
>>>>
>>>> On Tue, Mar 14, 2017 at 7:13 AM, Stephan Ewen <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node=12209=2>> wrote:
>>>>
>>>>> Hi Vinay!
>>>>>
>>>>> We just discovered a bug in RocksDB. The bug affects windows without
>>>>> reduce() or fold(), windows with evictors, and ListState.
>>>>>
>>>>> A certain access pattern in RocksDB starts being so slow after a
>>>>> certain size-per-key that it basically brings down the streaming program
>>>>> and the snapshots.
>>>>>
>>>>> We are reaching out to the RocksDB folks and looking for workarounds
>>>>> in Flink.
>>>>>
>>>>> Greetings,
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Wed, Mar 1, 2017 at 12:10 PM, Stephan Ewen <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node=12209=3>> wrote:
>>>>>
>>>>>> @vinay  Can you try to not set the buffer timeout at all? I am
>>>>>> actually not sure what would be the effect of setting it to a negative
>>>>>> value, that can be a cause of problems...
>>>>>>
>>>>>>
>>>>>> On Mon, Feb 27, 2017 at 7:44 PM, Seth Wiesman <[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node=12209=4>> wrote:
>>>>>>
>>>>>>> Vinay,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> The bucketing sink performs rename operations during the checkpoint
>>>>>>> and if it tries to rename a file that is not yet consistent that would
>>>>>>> cause a FileNotFound exception which would fail the checkpoint.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Stephan,
>>>>&

Re: Checkpointing with RocksDB as statebackend

2017-03-15 Thread vinay patil
Hi Stephan,

Thank you for making me aware of this.

Yes I am using a window without reduce function (Apply function). The
discussion happening on JIRA is exactly what I am observing, consistent
failure of checkpoints after some time and the stream halts.

We want to go live in next month, not sure how this will affect in
production as we are going to get above 200 million data.

As a workaround can I take the savepoint while the pipeline is running ?
Let's say if I take savepoint after every 30minutes, will it work ?



Regards,
Vinay Patil

On Tue, Mar 14, 2017 at 10:02 PM, Stephan Ewen [via Apache Flink User
Mailing List archive.] <ml-node+s2336050n12209...@n4.nabble.com> wrote:

> The issue in Flink is https://issues.apache.org/jira/browse/FLINK-5756
>
> On Tue, Mar 14, 2017 at 3:40 PM, Stefan Richter <[hidden email]
> <http:///user/SendEmail.jtp?type=node=12209=0>> wrote:
>
>> Hi Vinay,
>>
>> I think the issue is tracked here: https://github.com/faceb
>> ook/rocksdb/issues/1988.
>>
>> Best,
>> Stefan
>>
>> Am 14.03.2017 um 15:31 schrieb Vishnu Viswanath <[hidden email]
>> <http:///user/SendEmail.jtp?type=node=12209=1>>:
>>
>> Hi Stephan,
>>
>> Is there a ticket number/link to track this, My job has all the
>> conditions you mentioned.
>>
>> Thanks,
>> Vishnu
>>
>> On Tue, Mar 14, 2017 at 7:13 AM, Stephan Ewen <[hidden email]
>> <http:///user/SendEmail.jtp?type=node=12209=2>> wrote:
>>
>>> Hi Vinay!
>>>
>>> We just discovered a bug in RocksDB. The bug affects windows without
>>> reduce() or fold(), windows with evictors, and ListState.
>>>
>>> A certain access pattern in RocksDB starts being so slow after a certain
>>> size-per-key that it basically brings down the streaming program and the
>>> snapshots.
>>>
>>> We are reaching out to the RocksDB folks and looking for workarounds in
>>> Flink.
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Wed, Mar 1, 2017 at 12:10 PM, Stephan Ewen <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node=12209=3>> wrote:
>>>
>>>> @vinay  Can you try to not set the buffer timeout at all? I am actually
>>>> not sure what would be the effect of setting it to a negative value, that
>>>> can be a cause of problems...
>>>>
>>>>
>>>> On Mon, Feb 27, 2017 at 7:44 PM, Seth Wiesman <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node=12209=4>> wrote:
>>>>
>>>>> Vinay,
>>>>>
>>>>>
>>>>>
>>>>> The bucketing sink performs rename operations during the checkpoint
>>>>> and if it tries to rename a file that is not yet consistent that would
>>>>> cause a FileNotFound exception which would fail the checkpoint.
>>>>>
>>>>>
>>>>>
>>>>> Stephan,
>>>>>
>>>>>
>>>>>
>>>>> Currently my aws fork contains some very specific assumptions about
>>>>> the pipeline that will in general only hold for my pipeline. This is
>>>>> because there were still some open questions that  I had about how to 
>>>>> solve
>>>>> consistency issues in the general case. I will comment on the Jira issue
>>>>> with more specific.
>>>>>
>>>>>
>>>>>
>>>>> Seth Wiesman
>>>>>
>>>>>
>>>>>
>>>>> *From: *vinay patil <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node=12209=5>>
>>>>> *Reply-To: *"[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node=12209=6>" <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node=12209=7>>
>>>>> *Date: *Monday, February 27, 2017 at 1:05 PM
>>>>> *To: *"[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node=12209=8>" <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node=12209=9>>
>>>>>
>>>>> *Subject: *Re: Checkpointing with RocksDB as statebackend
>>>>>
>>>>>
>>>>>
>>>>> Hi Seth,
>>>>>
>>>>> Thank you for your suggestion.
>>>>>
>>>>> But if the issue is only related to S3, then why does this happen when
>>>>> I replace the S

Re: Frequent Full GC's in case of FSStateBackend

2017-03-08 Thread vinay patil
Hi Sai,

If you are sure that your state will not exceed the memory limit of nodes
then you should consider FSStatebackend otherwise you should go for RocksDB

What is the configuration of your cluster ?

On Mar 9, 2017 7:31 AM, "saiprasad mishra [via Apache Flink User Mailing
List archive.]" <ml-node+s2336050n12126...@n4.nabble.com> wrote:

> Hi All
>
> I am also seeing issues with FsStateBackend as it stalls coz of full gc.
> We have very large state,
> Does this mean the below doc should not claim that FsStateBackend is
> encouraged for large state.
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/ops/state_
> backends.html#the-fsstatebackend
>
> Regards
> Sai
>
> On Fri, Feb 10, 2017 at 6:19 AM, Stefan Richter <[hidden email]
> <http:///user/SendEmail.jtp?type=node=12126=0>> wrote:
>
>> Async snapshotting is the default.
>>
>> Am 10.02.2017 um 14:03 schrieb vinay patil <[hidden email]
>> <http:///user/SendEmail.jtp?type=node=12126=1>>:
>>
>> Hi Stephan,
>>
>> Thank you for the clarification.
>> Yes with RocksDB I don't see Full GC happening, also I am using Flink
>> 1.2.0 version and I have set the statebackend in flink-conf.yaml file to
>> rocksdb, so by default does this do asynchronous checkpointing or I have to
>> specify it at the job level  ?
>>
>> Regards,
>> Vinay Patil
>>
>> On Fri, Feb 10, 2017 at 4:16 PM, Stefan Richter [via Apache Flink User
>> Mailing List archive.] <[hidden email]> wrote:
>>
>>> Hi,
>>>
>>> FSStateBackend operates completely on-heap and only snapshots for
>>> checkpoints go against the file system. This is why the backend is
>>> typically faster for small states, but can become problematic for larger
>>> states. If your state exceeds a certain size, you should strongly consider
>>> to use RocksDB as backend. In particular, RocksDB also offers asynchronous
>>> snapshots which is very valuable to keep stream processing running for
>>> large state. RocksDB works on native memory/disk, so there is no GC to
>>> observe. For cases in which your state fits in memory but GC is a problem
>>> you could try using the G1 garbage collector which offers better
>>> performance for the FSStateBackend than the default.
>>>
>>> Best,
>>> Stefan
>>>
>>>
>>> Am 10.02.2017 um 11:16 schrieb Vinay Patil <[hidden email]
>>> <http://user/SendEmail.jtp?type=node=11565=0>>:
>>>
>>> Hi,
>>>
>>> I am doing performance test for my pipeline keeping FSStateBackend, I
>>> have observed frequent Full GC's after processing 20M records.
>>>
>>> When I did memory analysis using MAT, it showed that the many objects
>>> maintained by Flink state are live.
>>>
>>> Flink keeps the state in memory even after checkpointing , when does
>>> this state gets removed / GC. (I am using window operator in which the DTO
>>> comes as input)
>>>
>>> Also why does Flink keep the state in memory after checkpointing ?
>>>
>>> P.S Using RocksDB is not causing Full GC at all.
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>>
>>>
>>>
>>> --
>>> If you reply to this email, your message will be added to the discussion
>>> below:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/Frequent-Full-GC-s-in-case-of-FSStateBackend-tp11564p11565.html
>>> To start a new topic under Apache Flink User Mailing List archive.,
>>> email [hidden email]
>>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>>> NAML
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>
>>
>>
>> --
>> View this message in context: Re: Frequent Full GC's in case of
>> FSStateBackend
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Frequent-Full-GC-s-in-case-of-FSStateBackend-tp11564p11568.html>
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.co

Re: Integrate Flink with S3 on EMR cluster

2017-03-08 Thread vinay patil
Hi ,

@Shannon - I am not facing any issue while writing to S3, was getting
NoClassDef errors when reading the file from S3.

''Hadoop File System" - I mean I am using FileSystem class of Hadoop to read
the file from S3.

@Stephan - I tried with 1.1.4 , was getting the same issue.

The easiest way I found is to run " hadoop classpath " command, and paste
its value for export HADOOP_CLASSPATH variable.

This way we don't have to copy any S3 specific jars to Flink lib folder.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Integrate-Flink-with-S3-on-EMR-cluster-tp5894p12101.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Integrate Flink with S3 on EMR cluster

2017-03-07 Thread vinay patil
Hi Guys,

Has anyone got this error before ? If yes, have you found any other
solution apart from copying the jar files to flink lib folder

Regards,
Vinay Patil

On Mon, Mar 6, 2017 at 8:21 PM, vinay patil [via Apache Flink User Mailing
List archive.] <ml-node+s2336050n12053...@n4.nabble.com> wrote:

> Hi Guys,
>
> I am getting the same exception:
> EMRFileSystem not Found
>
> I am trying to read encrypted S3 file using Hadoop File System class.
>  (using Flink 1.2.0)
> When I copy all the libs from /usr/share/aws/emrfs/lib and /usr/lib/hadoop
> to Flink lib folder , it works.
>
> However I see that all these libs are already included in the Hadoop
> classpath.
>
> Is there any other way I can make this work ?
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Integrate-Flink-with-S3-on-EMR-cluster-tp5894p12053.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Integrate-Flink-with-S3-on-EMR-cluster-tp5894p12072.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Integrate Flink with S3 on EMR cluster

2017-03-06 Thread vinay patil
Hi Guys,

I am getting the same exception:
EMRFileSystem not Found

I am trying to read encrypted S3 file using Hadoop File System class. 
(using Flink 1.2.0)
When I copy all the libs from /usr/share/aws/emrfs/lib and /usr/lib/hadoop
to Flink lib folder , it works.

However I see that all these libs are already included in the Hadoop
classpath.

Is there any other way I can make this work ?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Integrate-Flink-with-S3-on-EMR-cluster-tp5894p12053.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Checkpointing with RocksDB as statebackend

2017-02-27 Thread vinay patil
Hi Seth,

Thank you for your suggestion.

But if the issue is only related to S3, then why does this happen when I
replace the S3 sink  to HDFS as well (for checkpointing I am using HDFS
only )

Stephan,
Another issue I see is when I set env.setBufferTimeout(-1) , and keep the
checkpoint interval to 10minutes, I have observed that nothing gets written
to sink (tried with S3 as well as HDFS), atleast I was expecting pending
files here.
This issue gets worst when checkpointing is disabled  as nothing is written.



Regards,
Vinay Patil

On Mon, Feb 27, 2017 at 10:55 PM, Stephan Ewen [via Apache Flink User
Mailing List archive.] <ml-node+s2336050n11943...@n4.nabble.com> wrote:

> Hi Seth!
>
> Wow, that is an awesome approach.
>
> We have actually seen these issues as well and we are looking to
> eventually implement our own S3 file system (and circumvent Hadoop's S3
> connector that Flink currently relies on): https://issues.apache.
> org/jira/browse/FLINK-5706
>
> Do you think your patch would be a good starting point for that and would
> you be willing to share it?
>
> The Amazon AWS SDK for Java is Apache 2 licensed, so that is possible to
> fork officially, if necessary...
>
> Greetings,
> Stephan
>
>
>
> On Mon, Feb 27, 2017 at 5:15 PM, Seth Wiesman <[hidden email]
> <http:///user/SendEmail.jtp?type=node=11943=0>> wrote:
>
>> Just wanted to throw in my 2cts.
>>
>>
>>
>> I’ve been running pipelines with similar state size using rocksdb which
>> externalize to S3 and bucket to S3. I was getting stalls like this and
>> ended up tracing the problem to S3 and the bucketing sink. The solution was
>> two fold:
>>
>>
>>
>> 1)   I forked hadoop-aws and have it treat flink as a source of
>> truth. Emr uses a dynamodb table to determine if S3 is inconsistent.
>> Instead I say that if flink believes that a file exists on S3 and we don’t
>> see it then I am going to trust that flink is in a consistent state and S3
>> is not. In this case, various operations will perform a back off and retry
>> up to a certain number of times.
>>
>>
>>
>> 2)   The bucketing sink performs multiple renames over the lifetime
>> of a file, occurring when a checkpoint starts and then again on
>> notification after it completes. Due to S3’s consistency guarantees the
>> second rename of file can never be assured to work and will eventually fail
>> either during or after a checkpoint. Because there is no upper bound on the
>> time it will take for a file on S3 to become consistent, retries cannot
>> solve this specific problem as it could take upwards of many minutes to
>> rename which would stall the entire pipeline. The only viable solution I
>> could find was to write a custom sink which understands S3. Each writer
>> will write file locally and then copy it to S3 on checkpoint. By only
>> interacting with S3 once per file it can circumvent consistency issues all
>> together.
>>
>>
>>
>> Hope this helps,
>>
>>
>>
>> Seth Wiesman
>>
>>
>>
>> *From: *vinay patil <[hidden email]
>> <http:///user/SendEmail.jtp?type=node=11943=1>>
>> *Reply-To: *"[hidden email]
>> <http:///user/SendEmail.jtp?type=node=11943=2>" <[hidden email]
>> <http:///user/SendEmail.jtp?type=node=11943=3>>
>> *Date: *Saturday, February 25, 2017 at 10:50 AM
>> *To: *"[hidden email]
>> <http:///user/SendEmail.jtp?type=node=11943=4>" <[hidden email]
>> <http:///user/SendEmail.jtp?type=node=11943=5>>
>> *Subject: *Re: Checkpointing with RocksDB as statebackend
>>
>>
>>
>> HI Stephan,
>>
>> Just to avoid the confusion here, I am using S3 sink for writing the
>> data, and using HDFS for storing checkpoints.
>>
>> There are 2 core nodes (HDFS) and two task nodes on EMR
>>
>>
>> I replaced s3 sink with HDFS for writing data in my last test.
>>
>> Let's say the checkpoint interval is 5 minutes, now within 5minutes of
>> run the state size grows to 30GB ,  after checkpointing the 30GB state that
>> is maintained in rocksDB has to be copied to HDFS, right ?  is this causing
>> the pipeline to stall ?
>>
>>
>> Regards,
>>
>> Vinay Patil
>>
>>
>>
>> On Sat, Feb 25, 2017 at 12:22 AM, Vinay Patil <[hidden email]> wrote:
>>
>> Hi Stephan,
>>
>> To verify if S3 is making teh pipeline stall, I have replaced the S3 sink
>> with HDFS and kept minimum pause between checkpoints to 5minutes, still I
>> see the same issue 

Re: Checkpointing with RocksDB as statebackend

2017-02-25 Thread vinay patil
HI Stephan,

Just to avoid the confusion here, I am using S3 sink for writing the data,
and using HDFS for storing checkpoints.
There are 2 core nodes (HDFS) and two task nodes on EMR

I replaced s3 sink with HDFS for writing data in my last test.

Let's say the checkpoint interval is 5 minutes, now within 5minutes of run
the state size grows to 30GB ,  after checkpointing the 30GB state that is
maintained in rocksDB has to be copied to HDFS, right ?  is this causing
the pipeline to stall ?


Regards,
Vinay Patil

On Sat, Feb 25, 2017 at 12:22 AM, Vinay Patil <vinay18.pa...@gmail.com>
wrote:

> Hi Stephan,
>
> To verify if S3 is making teh pipeline stall, I have replaced the S3 sink
> with HDFS and kept minimum pause between checkpoints to 5minutes, still I
> see the same issue with checkpoints getting failed.
>
> If I keep the  pause time to 20 seconds, all checkpoints are completed ,
> however there is a hit in overall throughput.
>
>
>
>
> Regards,
> Vinay Patil
>
> On Fri, Feb 24, 2017 at 10:09 PM, Stephan Ewen [via Apache Flink User
> Mailing List archive.] <ml-node+s2336050n11891...@n4.nabble.com> wrote:
>
>> Flink's state backends currently do a good number of "make sure this
>> exists" operations on the file systems. Through Hadoop's S3 filesystem,
>> that translates to S3 bucket list operations, where there is a limit in how
>> many operation may happen per time interval. After that, S3 blocks.
>>
>> It seems that operations that are totally cheap on HDFS are hellishly
>> expensive (and limited) on S3. It may be that you are affected by that.
>>
>> We are gradually trying to improve the behavior there and be more S3
>> aware.
>>
>> Both 1.3-SNAPSHOT and 1.2-SNAPSHOT already contain improvements there.
>>
>> Best,
>> Stephan
>>
>>
>> On Fri, Feb 24, 2017 at 4:42 PM, vinay patil <[hidden email]
>> <http:///user/SendEmail.jtp?type=node=11891=0>> wrote:
>>
>>> Hi Stephan,
>>>
>>> So do you mean that S3 is causing the stall , as I have mentioned in my
>>> previous mail, I could not see any progress for 16minutes as checkpoints
>>> were getting failed continuously.
>>>
>>> On Feb 24, 2017 8:30 PM, "Stephan Ewen [via Apache Flink User Mailing
>>> List archive.]" <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node=11887=0>> wrote:
>>>
>>>> Hi Vinay!
>>>>
>>>> True, the operator state (like Kafka) is currently not asynchronously
>>>> checkpointed.
>>>>
>>>> While it is rather small state, we have seen before that on S3 it can
>>>> cause trouble, because S3 frequently stalls uploads of even data amounts as
>>>> low as kilobytes due to its throttling policies.
>>>>
>>>> That would be a super important fix to add!
>>>>
>>>> Best,
>>>> Stephan
>>>>
>>>>
>>>> On Fri, Feb 24, 2017 at 2:58 PM, vinay patil <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node=11885=0>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have attached a snapshot for reference:
>>>>> As you can see all the 3 checkpointins failed , for checkpoint ID 2
>>>>> and 3 it
>>>>> is stuck at the Kafka source after 50%
>>>>> (The data sent till now by Kafka source 1 is 65GB and sent by source 2
>>>>> is
>>>>> 15GB )
>>>>>
>>>>> Within 10minutes 15M records were processed, and for the next
>>>>> 16minutes the
>>>>> pipeline is stuck , I don't see any progress beyond 15M because of
>>>>> checkpoints getting failed consistently.
>>>>>
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.na
>>>>> bble.com/file/n11882/Checkpointing_Failed.png>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context: http://apache-flink-user-maili
>>>>> ng-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-
>>>>> RocksDB-as-statebackend-tp11752p11882.html
>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>> archive at Nabble.com.
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> If you reply to this email, your message will be added to the
>>>> discussion below:
>>>> http://apache-flink

Re: Checkpointing with RocksDB as statebackend

2017-02-24 Thread vinay patil
Hi Stephan,

So do you mean that S3 is causing the stall , as I have mentioned in my
previous mail, I could not see any progress for 16minutes as checkpoints
were getting failed continuously.

On Feb 24, 2017 8:30 PM, "Stephan Ewen [via Apache Flink User Mailing List
archive.]" <ml-node+s2336050n11885...@n4.nabble.com> wrote:

> Hi Vinay!
>
> True, the operator state (like Kafka) is currently not asynchronously
> checkpointed.
>
> While it is rather small state, we have seen before that on S3 it can
> cause trouble, because S3 frequently stalls uploads of even data amounts as
> low as kilobytes due to its throttling policies.
>
> That would be a super important fix to add!
>
> Best,
> Stephan
>
>
> On Fri, Feb 24, 2017 at 2:58 PM, vinay patil <[hidden email]
> <http:///user/SendEmail.jtp?type=node=11885=0>> wrote:
>
>> Hi,
>>
>> I have attached a snapshot for reference:
>> As you can see all the 3 checkpointins failed , for checkpoint ID 2 and 3
>> it
>> is stuck at the Kafka source after 50%
>> (The data sent till now by Kafka source 1 is 65GB and sent by source 2 is
>> 15GB )
>>
>> Within 10minutes 15M records were processed, and for the next 16minutes
>> the
>> pipeline is stuck , I don't see any progress beyond 15M because of
>> checkpoints getting failed consistently.
>>
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/file/n11882/Checkpointing_Failed.png>
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Re-Checkpointing-
>> with-RocksDB-as-statebackend-tp11752p11882.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-
> Checkpointing-with-RocksDB-as-statebackend-tp11752p11885.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11887.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Checkpointing with RocksDB as statebackend

2017-02-24 Thread vinay patil
Hi,

I have attached a snapshot for reference:
As you can see all the 3 checkpointins failed , for checkpoint ID 2 and 3 it
is stuck at the Kafka source after 50%
(The data sent till now by Kafka source 1 is 65GB and sent by source 2 is
15GB )

Within 10minutes 15M records were processed, and for the next 16minutes the
pipeline is stuck , I don't see any progress beyond 15M because of
checkpoints getting failed consistently.


 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11882.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Checkpointing with RocksDB as statebackend

2017-02-24 Thread vinay patil
Hi Stephan,

Thank you for the brief explanation.

Yes I have already enabled Object Reuse mode because of which I see
significant improvement.

I am currently running on r3.4xlarge having 122GB memory, as you suggested I
had increased the checkpoint interval to 10minutes and minimum pause between
checkpoints was 5 minutes, here the complete processing was done in 8
minutes :) (before even a single checkpoint was triggered)

That's why I decreased the checkpoint interval to 3 minutes, but observed
that pipeline stops for a long amount of time for checkpoint, here the Kafka
source was taking the maximum time to acknowledge and complete the
checkpoints (4minutes timeout) , it failed for 3 consecutive time.

Can't we make Kafka do asynchronous checkpoints ? because I see consistent
failure of checkpoints for Kafka. I have not observed window checkpoints
getting failed as they are done asynchronously.

 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11879.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Checkpointing with RocksDB as statebackend

2017-02-23 Thread vinay patil
Hi,

When I disabled checkpointing the memory usage is similar for all nodes,
this means that for  checkpointing enabled case  the data is first flushed
to memory of CORE nodes (DataNode daemon is running here in case of EMR ) .

I am going to run with FSStatebackend on a high end cluster with 122GB RAM,
in case of FSStatebackend does it use TM heap memory or physical memory to
store the state ?

Regards,
Vinay Patil

On Thu, Feb 23, 2017 at 7:50 PM, vinay patil [via Apache Flink User Mailing
List archive.] <ml-node+s2336050n11831...@n4.nabble.com> wrote:

> Hi Stephan,
>
> Anyways the Async exception is gone.
>
> I have increased my instance type to r3.2xlarge having 60GB of memory.
> BUt what I have observed here is that for two task managers the memory
> usage is close to 30GB but for other two it goes up to 55GB, the load is
> equally distributed among all TM's.
> Why does this happen ?
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-
> Checkpointing-with-RocksDB-as-statebackend-tp11752p11831.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11845.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Checkpointing with RocksDB as statebackend

2017-02-23 Thread vinay patil
Hi Stephan,

Anyways the Async exception is gone.

I have increased my instance type to r3.2xlarge having 60GB of memory.
BUt what I have observed here is that for two task managers the memory usage
is close to 30GB but for other two it goes up to 55GB, the load is equally
distributed among all TM's.
Why does this happen ? 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11831.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink checkpointing gets stuck

2017-02-21 Thread vinay patil
Hi Shai,

I checked online that Azure DS5_v2 has SSD for storage, why don't you try
to use FLASH_SSD_OPTIMIZED option

In my case as well the stream was getting stuck for few minutes, my
checkpoint duration is 6secs and minimumPauseIntervalBetweenCheckpoints is
5secs

https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/large_state_tuning.html

I think if the writes to RocksDB are blocked then the stream can block for
certain interval
https://github.com/facebook/rocksdb/wiki/Write-Stalls

First try with FLASH_SSD_OPTIMIZED option, and don't give unnecessary high
heap memory to TM as rocksDB also uses physical memory




Regards,
Vinay Patil

On Tue, Feb 21, 2017 at 8:03 PM, Shai Kaplan [via Apache Flink User Mailing
List archive.] <ml-node+s2336050n11780...@n4.nabble.com> wrote:

> Hi Vinay.
>
>
>
> I couldn't understand from the thread, what configuration solved your
> problem?
>
>
>
> I'm using the default predefined option. Perhaps it's not the best
> configuration for my setting (I'm using Azure DS5_v2 machines), I honestly
> haven't given much thought to that particular detail, but I think it should
> only affect the performance, not make the job totally stuck.
>
>
>
> Thanks.
>
>
>
> *From:* vinay patil [mailto:[hidden email]
> <http:///user/SendEmail.jtp?type=node=11780=0>]
> *Sent:* Tuesday, February 21, 2017 3:58 PM
> *To:* [hidden email] <http:///user/SendEmail.jtp?type=node=11780=1>
> *Subject:* Re: Flink checkpointing gets stuck
>
>
>
> Hi Shai,
>
> I was facing similar issue , however now the stream is not stuck in
> between.
>
> you can refer this thread for the configurations I have done :
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-
> Checkpointing-with-RocksDB-as-statebackend-td11752.html
> <https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fapache-flink-user-mailing-list-archive.2336050.n4.nabble.com%2FRe-Checkpointing-with-RocksDB-as-statebackend-td11752.html=02%7C01%7CShai.Kaplan%40microsoft.com%7C70dc9d483010493b7fd308d45a623b0f%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636232825326824798=z0YAi2n6itetqIfkD6tuOpHKQY0qbOLNUuAoYiQEWak%3D=0>
>
> What is the configuration on which you running the job ?
> What is the RocksDB predefined option you are using ?
>
>
> Regards,
>
> Vinay Patil
>
>
>
> On Tue, Feb 21, 2017 at 7:13 PM, Shai Kaplan [via Apache Flink User
> Mailing List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node=11778=0>> wrote:
>
> Hi.
>
> I'm running a Flink 1.2 job with a 10 seconds checkpoint interval. After
> some running time (minutes-hours) Flink fails to save checkpoints, and
> stops processing records (I'm not sure if the checkpointing failure is the
> cause of the problem or just a symptom).
>
> After several checkpoints that take some seconds each, they start failing
> due to 30 minutes timeout.
>
> When I restart one of the Task Manager services (just to get the job
> restarted), the job is recovered from the last successful checkpoint (the
> state size continues to grow, so it's probably not the reason for the
> failure), advances somewhat, saves some more checkpoints, and then enters
> the failing state again.
>
> One of the times it happened, the first failed checkpoint failed due to
> "Checkpoint Coordinator is suspending.", so it might be an indicator for
> the cause of the problem, but looking into Flink's code I can't see how a
> running job could get to this state.
>
> I am using RocksDB for state, and the state is saved to Azure Blob Store,
> using the NativeAzureFileSystem HDFS connector over the wasbs protocol.
>
> Any ideas? Possibly a bug in Flink or RocksDB?
>
>
> --
>
> *If you reply to this email, your message will be added to the discussion
> below:*
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-
> checkpointing-gets-stuck-tp11776.html
> <https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fapache-flink-user-mailing-list-archive.2336050.n4.nabble.com%2FFlink-checkpointing-gets-stuck-tp11776.html=02%7C01%7CShai.Kaplan%40microsoft.com%7C70dc9d483010493b7fd308d45a623b0f%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636232825326824798=Qt7qCSOvhSkzQA1y9ze13UqEotuWt0yKSQJ9gIV1DW8%3D=0>
>
> To start a new topic under Apache Flink User Mailing List archive., email 
> [hidden
> email] <http:///user/SendEmail.jtp?type=node=11778=1>
> To unsubscribe from Apache Flink User Mailing List archive., click here.
> NAML
> <https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fapache-flink-user-mailing-list-archive.2336050.n4.nabble.com%2Ftemplate%2FNamlServlet.jtp%3Fmacro

  1   2   >