[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-02-03 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r569888405



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -798,73 +817,42 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 }
 final LogContext logContext = new 
LogContext(String.format("stream-client [%s] ", clientId));
 this.log = logContext.logger(getClass());
+
+// use client id instead of thread client id since this admin client 
may be shared among threads
 this.clientSupplier = clientSupplier;
-final MetricConfig metricConfig = new MetricConfig()
-.samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
-
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
-
.timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), 
TimeUnit.MILLISECONDS);
-final List reporters = 
config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
-MetricsReporter.class,
-Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, 
clientId));
-final JmxReporter jmxReporter = new JmxReporter();
-jmxReporter.configure(config.originals());
-reporters.add(jmxReporter);
-final MetricsContext metricsContext = new 
KafkaMetricsContext(JMX_PREFIX,
-
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
-metrics = new Metrics(metricConfig, reporters, time, metricsContext);
+adminClient = 
clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));
+
+log.info("Kafka Streams version: {}", ClientMetrics.version());
+log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId());
+
+metrics = getMetrics(config, time, clientId);
 streamsMetrics = new StreamsMetricsImpl(
 metrics,
 clientId,
 config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
 time
 );
+
 ClientMetrics.addVersionMetric(streamsMetrics);
 ClientMetrics.addCommitIdMetric(streamsMetrics);
 ClientMetrics.addApplicationIdMetric(streamsMetrics, 
config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
 ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, 
internalTopologyBuilder.describe().toString());
 ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> 
state);
-log.info("Kafka Streams version: {}", ClientMetrics.version());
-log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId());
-this.internalTopologyBuilder = internalTopologyBuilder;
-// re-write the physical topology according to the config
-internalTopologyBuilder.rewriteTopology(config);
+ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, 
(metricsConfig, now) ->

Review comment:
   Ah sorry, I did but it got covered up when I pushed some changes: 
https://github.com/apache/kafka/pull/9978#discussion_r565746851





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-29 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r567013928



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -133,6 +152,72 @@ private void configurePermissions(final File file) {
 }
 }
 
+/**
+ * @return true if the state directory was successfully locked
+ */
+private boolean lockStateDirectory() {
+final File lockFile = new File(stateDir, LOCK_FILE_NAME);
+try {
+stateDirLockChannel = FileChannel.open(lockFile.toPath(), 
StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+stateDirLock = tryLock(stateDirLockChannel);
+} catch (final IOException e) {
+log.error("Unable to lock the state directory due to unexpected 
exception", e);
+throw new ProcessorStateException("Failed to lock the state 
directory during startup", e);
+}
+
+return stateDirLock != null;
+}
+
+public UUID initializeProcessId() {

Review comment:
   alright done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-28 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r566576655



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -133,6 +152,72 @@ private void configurePermissions(final File file) {
 }
 }
 
+/**
+ * @return true if the state directory was successfully locked
+ */
+private boolean lockStateDirectory() {
+final File lockFile = new File(stateDir, LOCK_FILE_NAME);
+try {
+stateDirLockChannel = FileChannel.open(lockFile.toPath(), 
StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+stateDirLock = tryLock(stateDirLockChannel);
+} catch (final IOException e) {
+log.error("Unable to lock the state directory due to unexpected 
exception", e);
+throw new ProcessorStateException("Failed to lock the state 
directory during startup", e);
+}
+
+return stateDirLock != null;
+}
+
+public UUID initializeProcessId() {

Review comment:
   Oh my god. I hate json lol





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-28 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r566559281



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -133,6 +152,72 @@ private void configurePermissions(final File file) {
 }
 }
 
+/**
+ * @return true if the state directory was successfully locked
+ */
+private boolean lockStateDirectory() {
+final File lockFile = new File(stateDir, LOCK_FILE_NAME);
+try {
+stateDirLockChannel = FileChannel.open(lockFile.toPath(), 
StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+stateDirLock = tryLock(stateDirLockChannel);
+} catch (final IOException e) {
+log.error("Unable to lock the state directory due to unexpected 
exception", e);
+throw new ProcessorStateException("Failed to lock the state 
directory during startup", e);
+}
+
+return stateDirLock != null;
+}
+
+public UUID initializeProcessId() {

Review comment:
   I'll try it out with json. If we do use json, then we don't even need 
the version number right? As long as we only ever add fields then it should 
always be compatible. At least that's my understanding, I'm not a json expert





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-28 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r566550049



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -133,6 +152,72 @@ private void configurePermissions(final File file) {
 }
 }
 
+/**
+ * @return true if the state directory was successfully locked
+ */
+private boolean lockStateDirectory() {
+final File lockFile = new File(stateDir, LOCK_FILE_NAME);
+try {
+stateDirLockChannel = FileChannel.open(lockFile.toPath(), 
StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+stateDirLock = tryLock(stateDirLockChannel);
+} catch (final IOException e) {
+log.error("Unable to lock the state directory due to unexpected 
exception", e);
+throw new ProcessorStateException("Failed to lock the state 
directory during startup", e);
+}
+
+return stateDirLock != null;
+}
+
+public UUID initializeProcessId() {
+if (!hasPersistentStores) {
+return UUID.randomUUID();
+}
+
+if (!lockStateDirectory()) {
+log.error("Unable to obtain lock as state directory is already 
locked by another process");
+throw new StreamsException("Unable to initialize state, this can 
happen if multiple instances of " +
+   "Kafka Streams are running in the 
same state directory");
+}
+
+final File processFile = new File(stateDir, PROCESS_FILE_NAME);
+try {
+if (processFile.exists()) {
+try (final BufferedReader reader = 
Files.newBufferedReader(processFile.toPath())) {
+// only field in version 0 is the UUID
+final int version = Integer.parseInt(reader.readLine());
+if (version > 0) {

Review comment:
   Oh yeah definitely, thanks





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-28 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r566549210



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1208,24 +1227,28 @@ private Thread shutdownHelper(final boolean error) {
 }
 
 private boolean close(final long timeoutMs) {
-if (state == State.ERROR) {
-log.info("Streams client is already in the terminal state ERROR, 
all resources are closed and the client has stopped.");
+if (state == State.ERROR || state == State.NOT_RUNNING) {
+log.info("Streams client is already in the terminal {} state, all 
resources are closed and the client has stopped.", state);
 return true;
 }
-if (state == State.PENDING_ERROR) {
-log.info("Streams client is in PENDING_ERROR, all resources are 
being closed and the client will be stopped.");
-if (waitOnState(State.ERROR, timeoutMs)) {
+if (state == State.PENDING_ERROR || state == State.PENDING_SHUTDOWN) {
+log.info("Streams client is in {}, all resources are being closed 
and the client will be stopped.", state);
+if (state == State.PENDING_ERROR && waitOnState(State.ERROR, 
timeoutMs)) {
 log.info("Streams client stopped to ERROR completely");
 return true;
+} else if (state == State.PENDING_SHUTDOWN && 
waitOnState(State.NOT_RUNNING, timeoutMs)) {
+log.info("Streams client stopped to NOT_RUNNING completely");
+return true;
 } else {
-log.info("Streams client cannot transition to ERROR completely 
within the timeout");
+log.warn("Streams client cannot transition to {}} completely 
within the timeout", state);

Review comment:
   Ah good catch





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-28 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r566379864



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -133,6 +152,72 @@ private void configurePermissions(final File file) {
 }
 }
 
+/**
+ * @return true if the state directory was successfully locked
+ */
+private boolean lockStateDirectory() {
+final File lockFile = new File(stateDir, LOCK_FILE_NAME);
+try {
+stateDirLockChannel = FileChannel.open(lockFile.toPath(), 
StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+stateDirLock = tryLock(stateDirLockChannel);

Review comment:
   Nope, we just hold it until the KafkaStreams is closed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r565806577



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##
@@ -112,6 +118,19 @@ public void createTopics() throws Exception {
 CLUSTER.createTopic(outputTopic, 1, 3);
 }
 
+@After
+public void cleanUp() {
+if (streamInstanceOne != null) {
+streamInstanceOne.close();
+}
+if (streamInstanceTwo != null) {
+streamInstanceTwo.close();
+}
+if (streamInstanceOneRecovery != null) {
+streamInstanceOneRecovery.close();
+}

Review comment:
   There are no logical changes to this test, I just had to refactor it a 
bit because we were creating two copies of the same KafkaStreams at the same 
time (with the same app.dir & state.dir), even though one of them wasn't 
started until much later. Since we do the state initialization inside the 
KafkaStreams constructor, this was no good





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r565805758



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -416,11 +524,15 @@ private void cleanRemovedTasksCalledByUser() throws 
Exception {
 logPrefix(), dirName, id),
 exception
 );
-throw exception;

Review comment:
   IDE was giving me a warning





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r565803134



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -54,14 +61,27 @@
 private static final Logger log = 
LoggerFactory.getLogger(StateDirectory.class);
 static final String LOCK_FILE_NAME = ".lock";
 
+/* The process file is used to persist the process id across restarts.
+ * The version 0 schema consists only of the version number and UUID
+ *
+ * If you need to store additional metadata of the process you can bump 
the version numberand append new fields.
+ * For compatibility reasons you should only ever add fields, and only by 
appending them to the end
+ */
+private static final String PROCESS_FILE_NAME = 
"kafka-streams-process-metadata";
+private static final int PROCESS_FILE_VERSION = 0;

Review comment:
   No idea if we'll ever want to add anything else to this file, but better 
to be safe and forward compatible than sad





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r565802766



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1208,24 +1227,28 @@ private Thread shutdownHelper(final boolean error) {
 }
 
 private boolean close(final long timeoutMs) {
-if (state == State.ERROR) {
-log.info("Streams client is already in the terminal state ERROR, 
all resources are closed and the client has stopped.");
+if (state == State.ERROR || state == State.NOT_RUNNING) {

Review comment:
   Something I noticed during testing, I feel it makes sense for the 
handling of ERROR and NOT_RUNNING to parallel (same for the PENDING_ flavors). 
This is a slight change in behavior; now if a user calls `close()` while the 
instance is already closing, it will wait for the ongoing shutdown to complete 
before returning (with timeout).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r565802173



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -927,6 +912,39 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
 return streamThread;
 }
 
+private static Metrics getMetrics(final StreamsConfig config, final Time 
time, final String clientId) {
+final MetricConfig metricConfig = new MetricConfig()
+.samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
+
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
+
.timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), 
TimeUnit.MILLISECONDS);
+final List reporters = 
config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
+  
MetricsReporter.class,
+  
Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId));
+final JmxReporter jmxReporter = new JmxReporter();
+jmxReporter.configure(config.originals());
+reporters.add(jmxReporter);
+final MetricsContext metricsContext = new 
KafkaMetricsContext(JMX_PREFIX,
+  
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
+return new Metrics(metricConfig, reporters, time, metricsContext);
+}
+
+private int getNumStreamThreads(final boolean hasGlobalTopology) {
+final int numStreamThreads;
+if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
+log.info("Overriding number of StreamThreads to zero for 
global-only topology");
+numStreamThreads = 0;
+} else {
+numStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+}
+
+if (numStreamThreads == 0 && !hasGlobalTopology) {
+log.error("Topology with no input topics will create no stream 
threads and no global thread.");
+throw new TopologyException("Topology has no stream threads and no 
global threads, " +
+"must subscribe to at least one 
source topic or global table.");
+}
+return numStreamThreads;

Review comment:
   Just tried to factor some of the self-contained logic into helper 
methods, since I found it incredibly difficult to get oriented within the 
super-long KafkaStreams constructor





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r565801932



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -782,8 +782,27 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
  final Time time) throws StreamsException {
 this.config = config;
 this.time = time;
+
+this.internalTopologyBuilder = internalTopologyBuilder;
+internalTopologyBuilder.rewriteTopology(config);
+
+// sanity check to fail-fast in case we cannot build a 
ProcessorTopology due to an exception
+taskTopology = internalTopologyBuilder.buildTopology();
+globalTaskTopology = 
internalTopologyBuilder.buildGlobalStateTopology();
+
+final boolean hasGlobalTopology = globalTaskTopology != null;
+final boolean hasPersistentStores = 
taskTopology.hasPersistentLocalStore() ||
+(hasGlobalTopology && 
globalTaskTopology.hasPersistentGlobalStore());
+
+try {
+stateDirectory = new StateDirectory(config, time, 
hasPersistentStores);
+processId = stateDirectory.initializeProcessId();

Review comment:
   this is the only logical change in the KafkaStreams constructor: the 
rest of the diff is due to moving things around in order to get everything 
initialized in the proper order





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r565746851



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -782,8 +783,27 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
  final Time time) throws StreamsException {
 this.config = config;
 this.time = time;
+
+this.internalTopologyBuilder = internalTopologyBuilder;
+internalTopologyBuilder.rewriteTopology(config);
+
+// sanity check to fail-fast in case we cannot build a 
ProcessorTopology due to an exception
+taskTopology = internalTopologyBuilder.buildTopology();
+globalTaskTopology = 
internalTopologyBuilder.buildGlobalStateTopology();
+
+final boolean hasGlobalTopology = globalTaskTopology != null;
+final boolean hasPersistentStores = 
taskTopology.hasPersistentLocalStore() ||
+(hasGlobalTopology && 
globalTaskTopology.hasPersistentGlobalStore());
+
+try {
+stateDirectory = new StateDirectory(config, time, 
hasPersistentStores);
+processId = stateDirectory.getProcessId();

Review comment:
   This is the only real change in the constructor, but I had to move a few 
things around and tried to organize them as I went





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org