[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-19 Thread Vadim Vararu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838894#comment-17838894
 ] 

Vadim Vararu commented on FLINK-35115:
--

Great, thanks for quick fix (y)

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis, pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-18 Thread Vadim Vararu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838854#comment-17838854
 ] 

Vadim Vararu edited comment on FLINK-35115 at 4/19/24 4:54 AM:
---

[~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, right?


was (Author: JIRAUSER305101):
[~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, rigth?

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis, pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-18 Thread Vadim Vararu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838854#comment-17838854
 ] 

Vadim Vararu commented on FLINK-35115:
--

[~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, rigth?

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis, pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-18 Thread Vadim Vararu (Jira)


[ https://issues.apache.org/jira/browse/FLINK-35115 ]


Vadim Vararu deleted comment on FLINK-35115:
--

was (Author: JIRAUSER305101):
[~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, right?

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis, pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-18 Thread Vadim Vararu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838853#comment-17838853
 ] 

Vadim Vararu commented on FLINK-35115:
--

[~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, right?

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis, pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-17 Thread Vadim Vararu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838121#comment-17838121
 ] 

Vadim Vararu commented on FLINK-35115:
--

[~a.pilipenko] Yes, I can reproduce this consistently.

 

I've enabled this logger:

 
{code:java}
logger.kinesis.name = org.apache.flink.streaming.connectors.kinesis
logger.kinesis.level = DEBUG {code}
and got these last logs on TM before triggering the stop-with-savepoint (the 
log at 2024-04-17 14:05:11,753 is the last checkpoint):

 

 
{code:java}
2024-04-17 14:05:06,330 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Subtask 0 is trying to discover new shards that were created due to resharding 
...

2024-04-17 14:05:11,753 DEBUG 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - 
Snapshotting state ...

2024-04-17 14:05:11,753 DEBUG 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - 
Snapshotted state, last processed sequence numbers: 
{StreamShardMetadata{streamName='kinesis-dev-1-20210513-v3-contract-impression',
 shardId='shardId-', parentShardId='null', 
adjacentParentShardId='null', startingHashKey='0', 
endingHashKey='340282366920938463463374607431768211455', 
startingSequenceNumber='49618213417511572504838906841289148356109207047268990978',
 
endingSequenceNumber='null'}=49646826022549514041791139259235973731492142339223191554},
 checkpoint id: 1, timestamp: 1713351911711

2024-04-17 14:05:16,652 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Subtask 0 is trying to discover new shards that were created due to resharding 
...

2024-04-17 14:05:26,930 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Subtask 0 is trying to discover new shards that were created due to resharding 
...

2024-04-17 14:05:27,032 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer [] - 
stream: kinesis-dev-1-20210513-v3-contract-impression, shard: 
shardId-, millis behind latest: 0, batch size: 120

24-04-17 14:05:37,229 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Subtask 0 is trying to discover new shards that were created due to resharding 
...

2024-04-17 14:05:43,079 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer [] - 
stream: kinesis-dev-1-20210513-v3-contract-impression, shard: 
shardId-, millis behind latest: 0, batch size: 1

2024-04-17 14:05:47,752 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Subtask 0 is trying to discover new shards that were created due to resharding 
...

2024-04-17 14:05:50,677 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer [] - 
stream: kinesis-dev-1-20210513-v3-contract-impression, shard: 
shardId-, millis behind latest: 0, batch size: 1{code}
now I trigger the stop-with-savepoint:
{code:java}
2024-04-17 14:05:52,168 INFO 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Starting shutdown of shard consumer threads and AWS SDK resources of subtask 0 
...

2024-04-17 14:05:52,169 DEBUG 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Cancelled discovery

2024-04-17 14:05:52,169 INFO 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Shutting down the shard consumer threads of subtask 0 ...

2024-04-17 14:05:52,645 DEBUG 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - 
snapshotState() called on closed source; returning null.

2024-04-17 14:05:52,669 INFO 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Starting shutdown of shard consumer threads and AWS SDK resources of subtask 0 
...

2024-04-17 14:05:52,670 INFO 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Shutting down the shard consumer threads of subtask 0 ...  {code}
and here I start from the savepoint:
{code:java}
2024-04-17 14:12:56,691 INFO  
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Setting 
restore state in the FlinkKinesisConsumer. Using the following offsets: 
{org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata$EquivalenceWrapper@f5191c51=49646826022549514041791139259235973731492142339223191554}

2024-04-17 14:12:58,370 INFO  
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 
0 is seeding the fetcher with restored shard 
StreamShardHandle{streamName='kinesis-dev-1-20210513-v3-contract-impression', 
shard='{ShardId: shardId-,HashKeyRange: {StartingHashKey: 
0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: 
{StartingSequenceNumber: 
49618213417511572504838906841289148356109207047268990978,}}'}, starting

[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-16 Thread Vadim Vararu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837533#comment-17837533
 ] 

Vadim Vararu commented on FLINK-35115:
--

FYI it's reproducible in Kinesis -> Flink -> S3 sink also.

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.16.3, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
>     @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Major
>  Labels: kinesis
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-15 Thread Vadim Vararu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vadim Vararu updated FLINK-35115:
-
Summary: Kinesis connector writes wrong Kinesis sequence number at stop 
with savepoint  (was: Kinesis connector writes wrong sequence number at stop 
with savepoint)

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.16.3, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Priority: Major
>  Labels: kinesis
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35115) Kinesis connector writes wrong sequence number at stop with savepoint

2024-04-15 Thread Vadim Vararu (Jira)
Vadim Vararu created FLINK-35115:


 Summary: Kinesis connector writes wrong sequence number at stop 
with savepoint
 Key: FLINK-35115
 URL: https://issues.apache.org/jira/browse/FLINK-35115
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kinesis
Affects Versions: 1.16.3
 Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
exactly-once setup with:
 * Flink versions checked 1.16.3 and 1.18.1
 * Kinesis connector checked 1.16.3 and 4.2.0-1.18
 * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
{code:java}
StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment (); 
execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
().setCheckpointStorage (CHECKPOINTS_PATH); {code}

 * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
{code:java}
Properties sinkConfig = new Properties ();
sinkConfig.put ("transaction.timeout.ms", 48);

KafkaSink sink = KafkaSink.builder ()
.setBootstrapServers ("localhost:9092")
.setTransactionalIdPrefix ("test-prefix")
.setDeliverGuarantee (EXACTLY_ONCE)
.setKafkaProducerConfig (sinkConfig)
.setRecordSerializer (
(KafkaRecordSerializationSchema) (element, context, timestamp) 
-> new ProducerRecord<> (
"test-output-topic", null, element.getBytes ()))
.build (); {code}

 * Kinesis consumer defined as: 
{code:java}
FlinkKinesisConsumer flinkKinesisConsumer = new
FlinkKinesisConsumer<> ("test-stream",
new AbstractDeserializationSchema<> () {
@Override
public ByteBuffer deserialize (byte[] bytes) {
// Return
return ByteBuffer.wrap (bytes);
}
}, props); {code}
 
Reporter: Vadim Vararu


Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
stop-with-savepoint, Flink duplicates in Kafka all the records between the last 
checkpoint and the savepoint at resume:
 * Event1 is written to Kinesis
 * Event1 is processed by Flink 
 * Event1 is committed to Kafka at the checkpoint
 * 
 * Event2 is written to Kinesis
 * Event2 is processed by Flink
 * Stop with savepoint is triggered manually
 * Event2 is committed to Kafka
 * 
 * Job is resumed from the savepoint
 * *{color:#FF}Event2 is written again to Kafka at the first 
checkpoint{color}*

 

{color:#172b4d}I believe that it's a Kinesis connector issue for 2 
reasons:{color}
 * I've checked the actual Kinesis sequence number in the _metadata file 
generated at stop-with-savepoint and it's the one from the checkpoint before 
the savepoint  instead of being the one of the last record committed to Kafka.
 * I've tested exactly the save job with Kafka as source instead of Kinesis as 
source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35115) Kinesis connector writes wrong sequence number at stop with savepoint

2024-04-15 Thread Vadim Vararu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vadim Vararu updated FLINK-35115:
-
Affects Version/s: 1.18.1

> Kinesis connector writes wrong sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.16.3, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Priority: Major
>  Labels: kinesis
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35115) Kinesis connector writes wrong sequence number at stop with savepoint

2024-04-15 Thread Vadim Vararu (Jira)
Vadim Vararu created FLINK-35115:


 Summary: Kinesis connector writes wrong sequence number at stop 
with savepoint
 Key: FLINK-35115
 URL: https://issues.apache.org/jira/browse/FLINK-35115
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kinesis
Affects Versions: 1.16.3
 Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
exactly-once setup with:
 * Flink versions checked 1.16.3 and 1.18.1
 * Kinesis connector checked 1.16.3 and 4.2.0-1.18
 * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
{code:java}
StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment (); 
execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
().setCheckpointStorage (CHECKPOINTS_PATH); {code}

 * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
{code:java}
Properties sinkConfig = new Properties ();
sinkConfig.put ("transaction.timeout.ms", 48);

KafkaSink sink = KafkaSink.builder ()
.setBootstrapServers ("localhost:9092")
.setTransactionalIdPrefix ("test-prefix")
.setDeliverGuarantee (EXACTLY_ONCE)
.setKafkaProducerConfig (sinkConfig)
.setRecordSerializer (
(KafkaRecordSerializationSchema) (element, context, timestamp) 
-> new ProducerRecord<> (
"test-output-topic", null, element.getBytes ()))
.build (); {code}

 * Kinesis consumer defined as: 
{code:java}
FlinkKinesisConsumer flinkKinesisConsumer = new
FlinkKinesisConsumer<> ("test-stream",
new AbstractDeserializationSchema<> () {
@Override
public ByteBuffer deserialize (byte[] bytes) {
// Return
return ByteBuffer.wrap (bytes);
}
}, props); {code}
 
Reporter: Vadim Vararu


Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
stop-with-savepoint, Flink duplicates in Kafka all the records between the last 
checkpoint and the savepoint at resume:
 * Event1 is written to Kinesis
 * Event1 is processed by Flink 
 * Event1 is committed to Kafka at the checkpoint
 * 
 * Event2 is written to Kinesis
 * Event2 is processed by Flink
 * Stop with savepoint is triggered manually
 * Event2 is committed to Kafka
 * 
 * Job is resumed from the savepoint
 * *{color:#FF}Event2 is written again to Kafka at the first 
checkpoint{color}*

 

{color:#172b4d}I believe that it's a Kinesis connector issue for 2 
reasons:{color}
 * I've checked the actual Kinesis sequence number in the _metadata file 
generated at stop-with-savepoint and it's the one from the checkpoint before 
the savepoint  instead of being the one of the last record committed to Kafka.
 * I've tested exactly the save job with Kafka as source instead of Kinesis as 
source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Lifecycle of a map function

2020-04-07 Thread Vadim Vararu
Hi all,

I'm trying to guess understand what is the lifecycle of a map function in 
spark/yarn context. My understanding is that function is instantiated on the 
master and then passed to each executor (serialized/deserialized).

What I'd like to confirm is that the function is 
initialized/loaded/deserialized once per executor (JVM in yarn) and lives as 
long as executor lives and not once per task (logical unit of work to do).

Could you please explain or, better, give some links to source code or 
documentation? I've tried to take a look in Task.scala and ResultTask.scala but 
I'm not familiar with Scala and didn't find where exactly is function lifecycle 
managed.


Thanks in advance,
Vadim.


Re: Re:Flink batch job memory/disk leak when invoking set method on a static Configuration object.

2019-06-28 Thread Vadim Vararu
Hi,

I've run it on a standalone Flink cluster. No Yarn involved.

From: Haibo Sun 
Sent: Friday, June 28, 2019 6:13 AM
To: Vadim Vararu
Cc: user@flink.apache.org
Subject: Re:Flink batch job memory/disk leak when invoking set method on a 
static Configuration object.

Hi, Vadim

This similar issue has occurred in earlier versions, see 
https://issues.apache.org/jira/browse/FLINK-4485.
Are you running a Flink session cluster on YARN? I think it might be a bug. 
I'll see if I can reproduce it with the master branch code, and if yes, I will 
try to investigate it.

If someone already knows the cause of the problem, that's the best,  it won't 
need to be re-investigated.

Best,
Haibo


At 2019-06-28 00:46:43, "Vadim Vararu"  wrote:
Hi guys,

I have a simple batch job with a custom output formatter that writes to a local 
file.


public class JobHadoop {

public static void main(String[] args) throws Exception {
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

env.fromCollection(Sets.newHashSet("line1", "line2", "line3"))
.map(line -> line + "dd")
.write(new HadoopUsageOutputFormat(), "file:///tmp/out");

env.execute(JobHadoop.class.getName());
}

}

public class HadoopUsageOutputFormat extends FileOutputFormat 
implements OutputFormat {

private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new 
Configuration();
public static final String DEFAULT_LINE_DELIMITER = "\n";

private Writer writer;

static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}

@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
writer = new OutputStreamWriter(new BufferedOutputStream(stream));
}

@Override
public void writeRecord(String record) throws IOException {
writer.write(record);
writer.write(DEFAULT_LINE_DELIMITER);
}

@Override
public void close() throws IOException {
if (writer != null) {
this.writer.flush();
this.writer.close();
}

super.close();
}
}

The problem is that after the job is finished, there is somewhere a memory leak 
that does not permit the blobStore of the job to be deleted. The number of such 
"deleted" files increases after each job run. Even if they are marked as 
deleted, there is somewhere a reference to the file in the JobManager process 
that keeps it from actual deletion.

[cid:86fb55b$1$16b9c126cfc$Coremail$sunhaibotb$163.com]


Also, the problem reproduces only if I actually invoke the set method of 
Configuration:

static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}

>From my observations, if I change the

private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new 
Configuration();

to a non-static field, then the problem does no reproduce any more.


However, I'm interested if that's a normal behaviour or a bug/leak somewhere in 
Flink itself.

Thanks,
Vadim.



Flink batch job memory/disk leak when invoking set method on a static Configuration object.

2019-06-27 Thread Vadim Vararu
Hi guys,

I have a simple batch job with a custom output formatter that writes to a local 
file.


public class JobHadoop {

public static void main(String[] args) throws Exception {
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

env.fromCollection(Sets.newHashSet("line1", "line2", "line3"))
.map(line -> line + "dd")
.write(new HadoopUsageOutputFormat(), "file:///tmp/out");

env.execute(JobHadoop.class.getName());
}

}

public class HadoopUsageOutputFormat extends FileOutputFormat 
implements OutputFormat {

private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new 
Configuration();
public static final String DEFAULT_LINE_DELIMITER = "\n";

private Writer writer;

static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}

@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
writer = new OutputStreamWriter(new BufferedOutputStream(stream));
}

@Override
public void writeRecord(String record) throws IOException {
writer.write(record);
writer.write(DEFAULT_LINE_DELIMITER);
}

@Override
public void close() throws IOException {
if (writer != null) {
this.writer.flush();
this.writer.close();
}

super.close();
}
}

The problem is that after the job is finished, there is somewhere a memory leak 
that does not permit the blobStore of the job to be deleted. The number of such 
"deleted" files increases after each job run. Even if they are marked as 
deleted, there is somewhere a reference to the file in the JobManager process 
that keeps it from actual deletion.

[cid:55491778-9e15-4f39-bb1a-637d855e68fb]


Also, the problem reproduces only if I actually invoke the set method of 
Configuration:

static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}

>From my observations, if I change the

private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new 
Configuration();

to a non-static field, then the problem does no reproduce any more.


However, I'm interested if that's a normal behaviour or a bug/leak somewhere in 
Flink itself.

Thanks,
Vadim.



Broadcast state before events stream consumption

2019-02-08 Thread Vadim Vararu
Hi all,

I need to use the broadcast state mechanism 
(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html)
 for the next scenario.

I have a reference data stream (slow) and an events stream (fast running) and I 
want to do a kind of lookup in the reference stream for each
event. The broadcast state mechanism seems to fit perfect the scenario.

>From documentation:
As an example where broadcast state can emerge as a natural fit, one can 
imagine a low-throughput stream containing a set of rules which we want to 
evaluate against all elements coming from another stream.

However, I am not sure what is the correct way to delay the consumption of the 
fast running stream until the slow one is fully read (in case of a file) or 
until a marker is emitted (in case of some other source). Is there any way to 
accomplish that? It doesn't seem to be a rare use case.

Thanks, Vadim.


Flink streaming. Broadcast reference data map across nodes

2017-02-21 Thread Vadim Vararu

Hi all,


I would like to do something similar to Spark's broadcast mechanism.

Basically, i have a big dictionary of reference data that has to be 
accessible from all the nodes (in order to do some joins of log line 
with reference line).


I did not find yet a way to do it.


Any ideas?



Re: Is it OK to have very many session windows?

2017-02-20 Thread Vadim Vararu

It's something like:

DataStreamSource stream = 
env.addSource(getKafkaConsumer(parameterTool)); stream
.map(getEventToDomainMapper())
.keyBy(getKeySelector())
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(90)))
.reduce(getReducer())
.map(getToJsonMapper())
.addSink(getKafkaProducer(parameterTool));


Each new event may be reduced against the existent state from the 
window, so normally it's okay to have only 1 row in memory.


I'm new to Flink and have not yet reached the "incremental aggregates" 
but, if i understand correctly, it fits my case.


Vadim.



On 20.02.2017 17:47, Timo Walther wrote:

Hi Vadim,

this of course depends on your use case. The question is how large is 
your state per pane and how much memory is available for Flink?
Are you using incremental aggregates such that only the aggregated 
value per pane has to be kept in memory?


Regards,
Timo


Am 20/02/17 um 16:34 schrieb Vadim Vararu:

HI guys,

Is it okay to have very many (tens of thousands or hundreds of 
thousand) of session windows?



Thanks, Vadim.







Is it OK to have very many session windows?

2017-02-20 Thread Vadim Vararu

HI guys,

Is it okay to have very many (tens of thousands or hundreds of thousand) 
of session windows?



Thanks, Vadim.



ImportTSV write to remote HDFS concurrently.

2016-10-21 Thread Vadim Vararu

Hi guys,

I'm trying to run the importTSV job and to write the result into a 
remote HDFS. Isn't it supposed to write data concurrently? Asking cause 
i get the same time with 2 and 4 nodes and i can see that there is only 
1 reduce running.

Where is the bottleneck?

Thanks, Vadim.


unsubscribe

2016-05-04 Thread Vadim Vararu

unsubscribe

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is JavaSparkContext.wholeTextFiles distributed?

2016-04-26 Thread Vadim Vararu
Spark can create distributed datasets from any storage source supported 
by Hadoop, including your local file system, HDFS, Cassandra, HBase, 
Amazon S3 , etc. Spark supports 
text files, SequenceFiles 
, 
and any other Hadoop InputFormat 
.


Text file RDDs can be created using |SparkContext|’s |textFile| method. 
This method takes an URI for the file (either a local path on the 
machine, or a |hdfs://|, |s3n://|, etc URI) and reads it as a collection 
of lines. Here is an example invocation



I could not find an concrete statement where it says either the read 
(more than one file) is distributed or not.


On 26.04.2016 18:00, Hyukjin Kwon wrote:

then this would not be distributed




Is JavaSparkContext.wholeTextFiles distributed?

2016-04-26 Thread Vadim Vararu

Hi guys,

I'm trying to read many filed from s3 using 
JavaSparkContext.wholeTextFiles(...). Is that executed in a distributed 
manner? Please give me a link to the place in documentation where it's 
specified.


Thanks, Vadim.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Pig 0.11.1 blocks at: Connecting to hadoop file system at: file:///

2016-04-25 Thread Vadim Vararu

Hi,

Trying to run pig in local mode:
//tmp/pig-0.11.1/bin/pig -x local -m /tmp/pig.properties -f 
"/tmp/pig-scripts/local/pig_script.pig"/


It blocks at the:
/2016-04-25 08:46:55,947 [main] INFO 
org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - 
Connecting to hadoop file system at: file:

and stays at this line infinitely.

The problem is on pig 0.11.1, but does not reproduce on 0.12.0
It does not print anything in the logs or console, so I don't have a 
clue why it happens.   Any ideas?


Thank, Vadim.




Re: Exception in the bean after onException

2014-11-05 Thread Vadim Vararu
Solved, it was an issue on the Throwable object, not a Camel one. Thanks.

On Tue, Nov 4, 2014 at 9:27 AM, Vadim Vararu vararu.va...@gmail.com wrote:

 Yes, that works, but as i've observed, i get this way only the cause
 message, and not the whole stacktrace.

 On Mon, Nov 3, 2014 at 6:55 PM, Claus Ibsen claus.ib...@gmail.com wrote:

 Hi

 This is by design. As the on exception routes the exchange, so the
 exchange needs to not have the exception in getException as that is
 used to detect if a new exception is thrown during routing with
 onException.

 So you can grab the caused exception from the exchange property, or if
 you use a bean instead of .process, then just define an Exception type
 in the method signature and Camel will provide the caused exception in
 the parameter.

 On Mon, Nov 3, 2014 at 4:53 PM, Vadim Vararu vararu.va...@gmail.com
 wrote:
  Hi,
 
  I have a global error handler that passes the exchange to a processor.
  I wonder why the exception attribute of the injected Exchange is null in
  this case?
 
  //GLOBAL ERROR MANAGEMENT
  onException(Throwable.class)
  .process(new GlobalLoggerProcessor());
 
  I can find the message of the exception in a property of the exchange,
 but
  the exception itself if NULL.
 
  Thanks in advance, Vadim.



 --
 Claus Ibsen
 -
 Red Hat, Inc.
 Email: cib...@redhat.com
 Twitter: davsclaus
 Blog: http://davsclaus.com
 Author of Camel in Action: http://www.manning.com/ibsen
 hawtio: http://hawt.io/
 fabric8: http://fabric8.io/





Exception in the bean after onException

2014-11-03 Thread Vadim Vararu
Hi,

I have a global error handler that passes the exchange to a processor.
I wonder why the exception attribute of the injected Exchange is null in
this case?

//GLOBAL ERROR MANAGEMENT
onException(Throwable.class)
.process(new GlobalLoggerProcessor());

I can find the message of the exception in a property of the exchange, but
the exception itself if NULL.

Thanks in advance, Vadim.


Re: Exception in the bean after onException

2014-11-03 Thread Vadim Vararu
Yes, that works, but as i've observed, i get this way only the cause
message, and not the whole stacktrace.

On Mon, Nov 3, 2014 at 6:55 PM, Claus Ibsen claus.ib...@gmail.com wrote:

 Hi

 This is by design. As the on exception routes the exchange, so the
 exchange needs to not have the exception in getException as that is
 used to detect if a new exception is thrown during routing with
 onException.

 So you can grab the caused exception from the exchange property, or if
 you use a bean instead of .process, then just define an Exception type
 in the method signature and Camel will provide the caused exception in
 the parameter.

 On Mon, Nov 3, 2014 at 4:53 PM, Vadim Vararu vararu.va...@gmail.com
 wrote:
  Hi,
 
  I have a global error handler that passes the exchange to a processor.
  I wonder why the exception attribute of the injected Exchange is null in
  this case?
 
  //GLOBAL ERROR MANAGEMENT
  onException(Throwable.class)
  .process(new GlobalLoggerProcessor());
 
  I can find the message of the exception in a property of the exchange,
 but
  the exception itself if NULL.
 
  Thanks in advance, Vadim.



 --
 Claus Ibsen
 -
 Red Hat, Inc.
 Email: cib...@redhat.com
 Twitter: davsclaus
 Blog: http://davsclaus.com
 Author of Camel in Action: http://www.manning.com/ibsen
 hawtio: http://hawt.io/
 fabric8: http://fabric8.io/



[Stripes-users] welcome page

2009-03-10 Thread Vadim Vararu
Hi everybody! How could i set the welcome page to be not a .jsp, but an
action, dispatched by the stripes dispatcher.

I tried to do this:
welcome-file-list
welcome-file/Default.action/welcome-file
/welcome-file-list

so, i have in default package a Default class that implements ActionBean and
has a default handler method, but it does not work :(
--
___
Stripes-users mailing list
Stripes-users@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/stripes-users