[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r569888405 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -798,73 +817,42 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, } final LogContext logContext = new LogContext(String.format("stream-client [%s] ", clientId)); this.log = logContext.logger(getClass()); + +// use client id instead of thread client id since this admin client may be shared among threads this.clientSupplier = clientSupplier; -final MetricConfig metricConfig = new MetricConfig() -.samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG)) - .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))) - .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); -final List reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, -MetricsReporter.class, -Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId)); -final JmxReporter jmxReporter = new JmxReporter(); -jmxReporter.configure(config.originals()); -reporters.add(jmxReporter); -final MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, - config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)); -metrics = new Metrics(metricConfig, reporters, time, metricsContext); +adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId))); + +log.info("Kafka Streams version: {}", ClientMetrics.version()); +log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId()); + +metrics = getMetrics(config, time, clientId); streamsMetrics = new StreamsMetricsImpl( metrics, clientId, config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG), time ); + ClientMetrics.addVersionMetric(streamsMetrics); ClientMetrics.addCommitIdMetric(streamsMetrics); ClientMetrics.addApplicationIdMetric(streamsMetrics, config.getString(StreamsConfig.APPLICATION_ID_CONFIG)); ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, internalTopologyBuilder.describe().toString()); ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state); -log.info("Kafka Streams version: {}", ClientMetrics.version()); -log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId()); -this.internalTopologyBuilder = internalTopologyBuilder; -// re-write the physical topology according to the config -internalTopologyBuilder.rewriteTopology(config); +ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> Review comment: Ah sorry, I did but it got covered up when I pushed some changes: https://github.com/apache/kafka/pull/9978#discussion_r565746851 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r567013928 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -133,6 +152,72 @@ private void configurePermissions(final File file) { } } +/** + * @return true if the state directory was successfully locked + */ +private boolean lockStateDirectory() { +final File lockFile = new File(stateDir, LOCK_FILE_NAME); +try { +stateDirLockChannel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE); +stateDirLock = tryLock(stateDirLockChannel); +} catch (final IOException e) { +log.error("Unable to lock the state directory due to unexpected exception", e); +throw new ProcessorStateException("Failed to lock the state directory during startup", e); +} + +return stateDirLock != null; +} + +public UUID initializeProcessId() { Review comment: alright done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r566576655 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -133,6 +152,72 @@ private void configurePermissions(final File file) { } } +/** + * @return true if the state directory was successfully locked + */ +private boolean lockStateDirectory() { +final File lockFile = new File(stateDir, LOCK_FILE_NAME); +try { +stateDirLockChannel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE); +stateDirLock = tryLock(stateDirLockChannel); +} catch (final IOException e) { +log.error("Unable to lock the state directory due to unexpected exception", e); +throw new ProcessorStateException("Failed to lock the state directory during startup", e); +} + +return stateDirLock != null; +} + +public UUID initializeProcessId() { Review comment: Oh my god. I hate json lol This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r566559281 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -133,6 +152,72 @@ private void configurePermissions(final File file) { } } +/** + * @return true if the state directory was successfully locked + */ +private boolean lockStateDirectory() { +final File lockFile = new File(stateDir, LOCK_FILE_NAME); +try { +stateDirLockChannel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE); +stateDirLock = tryLock(stateDirLockChannel); +} catch (final IOException e) { +log.error("Unable to lock the state directory due to unexpected exception", e); +throw new ProcessorStateException("Failed to lock the state directory during startup", e); +} + +return stateDirLock != null; +} + +public UUID initializeProcessId() { Review comment: I'll try it out with json. If we do use json, then we don't even need the version number right? As long as we only ever add fields then it should always be compatible. At least that's my understanding, I'm not a json expert This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r566550049 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -133,6 +152,72 @@ private void configurePermissions(final File file) { } } +/** + * @return true if the state directory was successfully locked + */ +private boolean lockStateDirectory() { +final File lockFile = new File(stateDir, LOCK_FILE_NAME); +try { +stateDirLockChannel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE); +stateDirLock = tryLock(stateDirLockChannel); +} catch (final IOException e) { +log.error("Unable to lock the state directory due to unexpected exception", e); +throw new ProcessorStateException("Failed to lock the state directory during startup", e); +} + +return stateDirLock != null; +} + +public UUID initializeProcessId() { +if (!hasPersistentStores) { +return UUID.randomUUID(); +} + +if (!lockStateDirectory()) { +log.error("Unable to obtain lock as state directory is already locked by another process"); +throw new StreamsException("Unable to initialize state, this can happen if multiple instances of " + + "Kafka Streams are running in the same state directory"); +} + +final File processFile = new File(stateDir, PROCESS_FILE_NAME); +try { +if (processFile.exists()) { +try (final BufferedReader reader = Files.newBufferedReader(processFile.toPath())) { +// only field in version 0 is the UUID +final int version = Integer.parseInt(reader.readLine()); +if (version > 0) { Review comment: Oh yeah definitely, thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r566549210 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1208,24 +1227,28 @@ private Thread shutdownHelper(final boolean error) { } private boolean close(final long timeoutMs) { -if (state == State.ERROR) { -log.info("Streams client is already in the terminal state ERROR, all resources are closed and the client has stopped."); +if (state == State.ERROR || state == State.NOT_RUNNING) { +log.info("Streams client is already in the terminal {} state, all resources are closed and the client has stopped.", state); return true; } -if (state == State.PENDING_ERROR) { -log.info("Streams client is in PENDING_ERROR, all resources are being closed and the client will be stopped."); -if (waitOnState(State.ERROR, timeoutMs)) { +if (state == State.PENDING_ERROR || state == State.PENDING_SHUTDOWN) { +log.info("Streams client is in {}, all resources are being closed and the client will be stopped.", state); +if (state == State.PENDING_ERROR && waitOnState(State.ERROR, timeoutMs)) { log.info("Streams client stopped to ERROR completely"); return true; +} else if (state == State.PENDING_SHUTDOWN && waitOnState(State.NOT_RUNNING, timeoutMs)) { +log.info("Streams client stopped to NOT_RUNNING completely"); +return true; } else { -log.info("Streams client cannot transition to ERROR completely within the timeout"); +log.warn("Streams client cannot transition to {}} completely within the timeout", state); Review comment: Ah good catch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r566379864 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -133,6 +152,72 @@ private void configurePermissions(final File file) { } } +/** + * @return true if the state directory was successfully locked + */ +private boolean lockStateDirectory() { +final File lockFile = new File(stateDir, LOCK_FILE_NAME); +try { +stateDirLockChannel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE); +stateDirLock = tryLock(stateDirLockChannel); Review comment: Nope, we just hold it until the KafkaStreams is closed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r565806577 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java ## @@ -112,6 +118,19 @@ public void createTopics() throws Exception { CLUSTER.createTopic(outputTopic, 1, 3); } +@After +public void cleanUp() { +if (streamInstanceOne != null) { +streamInstanceOne.close(); +} +if (streamInstanceTwo != null) { +streamInstanceTwo.close(); +} +if (streamInstanceOneRecovery != null) { +streamInstanceOneRecovery.close(); +} Review comment: There are no logical changes to this test, I just had to refactor it a bit because we were creating two copies of the same KafkaStreams at the same time (with the same app.dir & state.dir), even though one of them wasn't started until much later. Since we do the state initialization inside the KafkaStreams constructor, this was no good This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r565805758 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -416,11 +524,15 @@ private void cleanRemovedTasksCalledByUser() throws Exception { logPrefix(), dirName, id), exception ); -throw exception; Review comment: IDE was giving me a warning This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r565803134 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -54,14 +61,27 @@ private static final Logger log = LoggerFactory.getLogger(StateDirectory.class); static final String LOCK_FILE_NAME = ".lock"; +/* The process file is used to persist the process id across restarts. + * The version 0 schema consists only of the version number and UUID + * + * If you need to store additional metadata of the process you can bump the version numberand append new fields. + * For compatibility reasons you should only ever add fields, and only by appending them to the end + */ +private static final String PROCESS_FILE_NAME = "kafka-streams-process-metadata"; +private static final int PROCESS_FILE_VERSION = 0; Review comment: No idea if we'll ever want to add anything else to this file, but better to be safe and forward compatible than sad This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r565802766 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1208,24 +1227,28 @@ private Thread shutdownHelper(final boolean error) { } private boolean close(final long timeoutMs) { -if (state == State.ERROR) { -log.info("Streams client is already in the terminal state ERROR, all resources are closed and the client has stopped."); +if (state == State.ERROR || state == State.NOT_RUNNING) { Review comment: Something I noticed during testing, I feel it makes sense for the handling of ERROR and NOT_RUNNING to parallel (same for the PENDING_ flavors). This is a slight change in behavior; now if a user calls `close()` while the instance is already closing, it will wait for the ongoing shutdown to complete before returning (with timeout). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r565802173 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -927,6 +912,39 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin return streamThread; } +private static Metrics getMetrics(final StreamsConfig config, final Time time, final String clientId) { +final MetricConfig metricConfig = new MetricConfig() +.samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG)) + .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))) + .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); +final List reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, + MetricsReporter.class, + Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId)); +final JmxReporter jmxReporter = new JmxReporter(); +jmxReporter.configure(config.originals()); +reporters.add(jmxReporter); +final MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, + config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)); +return new Metrics(metricConfig, reporters, time, metricsContext); +} + +private int getNumStreamThreads(final boolean hasGlobalTopology) { +final int numStreamThreads; +if (internalTopologyBuilder.hasNoNonGlobalTopology()) { +log.info("Overriding number of StreamThreads to zero for global-only topology"); +numStreamThreads = 0; +} else { +numStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); +} + +if (numStreamThreads == 0 && !hasGlobalTopology) { +log.error("Topology with no input topics will create no stream threads and no global thread."); +throw new TopologyException("Topology has no stream threads and no global threads, " + +"must subscribe to at least one source topic or global table."); +} +return numStreamThreads; Review comment: Just tried to factor some of the self-contained logic into helper methods, since I found it incredibly difficult to get oriented within the super-long KafkaStreams constructor This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r565801932 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -782,8 +782,27 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, final Time time) throws StreamsException { this.config = config; this.time = time; + +this.internalTopologyBuilder = internalTopologyBuilder; +internalTopologyBuilder.rewriteTopology(config); + +// sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception +taskTopology = internalTopologyBuilder.buildTopology(); +globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology(); + +final boolean hasGlobalTopology = globalTaskTopology != null; +final boolean hasPersistentStores = taskTopology.hasPersistentLocalStore() || +(hasGlobalTopology && globalTaskTopology.hasPersistentGlobalStore()); + +try { +stateDirectory = new StateDirectory(config, time, hasPersistentStores); +processId = stateDirectory.initializeProcessId(); Review comment: this is the only logical change in the KafkaStreams constructor: the rest of the diff is due to moving things around in order to get everything initialized in the proper order This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r565746851 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -782,8 +783,27 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, final Time time) throws StreamsException { this.config = config; this.time = time; + +this.internalTopologyBuilder = internalTopologyBuilder; +internalTopologyBuilder.rewriteTopology(config); + +// sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception +taskTopology = internalTopologyBuilder.buildTopology(); +globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology(); + +final boolean hasGlobalTopology = globalTaskTopology != null; +final boolean hasPersistentStores = taskTopology.hasPersistentLocalStore() || +(hasGlobalTopology && globalTaskTopology.hasPersistentGlobalStore()); + +try { +stateDirectory = new StateDirectory(config, time, hasPersistentStores); +processId = stateDirectory.getProcessId(); Review comment: This is the only real change in the constructor, but I had to move a few things around and tried to organize them as I went This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org