[
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16811681#comment-16811681
]
Ted Yu commented on KAFKA-5998:
-------------------------------
I made the following change:
{code}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index badaa36cd..be29ebe56 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -138,6 +138,7 @@ public abstract class AbstractJoinIntegrationTest {
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
LongDeserializer.class);
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
+ STREAMS_CONFIG.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "1");
STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE,
true);
STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
{code}
For the following check in KafkaStreams#start:
{code}
if (state == State.RUNNING) {
stateDirectory.cleanRemovedTasks(cleanupDelay);
{code}
The state was REBALANCING during TableTableJoinIntegrationTest.
The cleaning was not triggered.
> /.checkpoint.tmp Not found exception
> ------------------------------------
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
> Reporter: Yogesh BG
> Priority: Major
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt,
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its
> since two days under load of around 2500 msgs per second.. On third day am
> getting below exception for some of the partitions, I have 16 partitions only
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException:
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.<init>(FileOutputStream.java:171)
> ~[na:1.7.0_111]
> at
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
> ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
> ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException:
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.<init>(FileOutputStream.java:171)
> ~[na:1.7.0_111]
> at
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
> ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
> ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> }}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)