ZihanLi58 commented on code in PR #3557: URL: https://github.com/apache/gobblin/pull/3557#discussion_r967366821
########## gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.monitoring; + +import java.util.HashSet; + +import com.google.inject.Inject; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.kafka.HighLevelConsumer; +import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; +import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler; + + +/** + * A Flow Spec Store change monitor that uses {@link SpecStoreChangeEvent} schema to process Kafka messages received + * from the consumer service. This monitor responds to changes to flow specs (creations, updates, deletes) and acts as + * a connector between the API and execution layers of GaaS. + */ +@Slf4j +public class SpecStoreChangeMonitor extends HighLevelConsumer { + protected HashSet<Long> timestampsSeenBefore; + + @Inject + protected FlowCatalog flowCatalog; + + @Inject + protected GobblinServiceJobScheduler scheduler; + + @Inject + public SpecStoreChangeMonitor(String topic, Config config, int numThreads) { + super(topic, config, numThreads); + this.timestampsSeenBefore = new HashSet(); + } + + @Override + protected void processMessage(DecodeableKafkaRecord message) { + String specUri = (String) message.getKey(); + SpecStoreChangeEvent value = (SpecStoreChangeEvent) message.getValue(); + + Long timestamp = value.getTimestamp(); + String operation = value.getOperationType().name(); + log.info("Processing message with specUri is {} timestamp is {} operation is {}", specUri, timestamp, operation); + + // If we've already processed a message with this timestamp before then skip duplicate message + if (timestampsSeenBefore.contains(timestamp)) { Review Comment: If you want to filter out duplicate, I think timestamp + specUri should be the key. only timestamp seems risky. ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java: ########## @@ -119,6 +122,8 @@ public GobblinServiceConfiguration(String serviceName, String serviceId, Config ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY, true); this.isTopologySpecFactoryEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY, true); + this.isSpecStoreChangeMonitorEnabled = + ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_SPEC_STORE_CHANGE_MONITOR_ENABLED_KEY, true); Review Comment: by default, this should be false? ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java: ########## @@ -457,4 +458,15 @@ public void remove(URI uri, Properties headers, boolean triggerListener) { public Object getSyncObject(String specUri) { return this.specSyncObjects.getOrDefault(specUri, null); } + + public Spec getSpecFromStore(String specUri) { + try { + URI uri = new URI(specUri); + return specStore.getSpec(uri); + } catch (SpecNotFoundException e) { + throw new RuntimeException("Could not find Spec from Spec Store for URI: " + specUri, e); Review Comment: In this case, I don't think we want to throw exception. If it's delete operation, the spec is not in catalog. ########## gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.monitoring; + +import java.util.HashSet; + +import com.google.inject.Inject; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.kafka.HighLevelConsumer; +import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; +import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler; + + +/** + * A Flow Spec Store change monitor that uses {@link SpecStoreChangeEvent} schema to process Kafka messages received + * from the consumer service. This monitor responds to changes to flow specs (creations, updates, deletes) and acts as + * a connector between the API and execution layers of GaaS. + */ +@Slf4j +public class SpecStoreChangeMonitor extends HighLevelConsumer { + protected HashSet<Long> timestampsSeenBefore; + + @Inject + protected FlowCatalog flowCatalog; + + @Inject + protected GobblinServiceJobScheduler scheduler; + + @Inject + public SpecStoreChangeMonitor(String topic, Config config, int numThreads) { + super(topic, config, numThreads); + this.timestampsSeenBefore = new HashSet(); + } + + @Override + protected void processMessage(DecodeableKafkaRecord message) { + String specUri = (String) message.getKey(); + SpecStoreChangeEvent value = (SpecStoreChangeEvent) message.getValue(); + + Long timestamp = value.getTimestamp(); + String operation = value.getOperationType().name(); + log.info("Processing message with specUri is {} timestamp is {} operation is {}", specUri, timestamp, operation); + + // If we've already processed a message with this timestamp before then skip duplicate message + if (timestampsSeenBefore.contains(timestamp)) { + return; + } + + Spec spec = this.flowCatalog.getSpecFromStore(specUri); Review Comment: What happens if it's delete? you will get null or event exception in this case ########## gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.monitoring; + +import java.util.HashSet; + +import com.google.inject.Inject; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.kafka.HighLevelConsumer; +import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; +import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler; + + +/** + * A Flow Spec Store change monitor that uses {@link SpecStoreChangeEvent} schema to process Kafka messages received + * from the consumer service. This monitor responds to changes to flow specs (creations, updates, deletes) and acts as + * a connector between the API and execution layers of GaaS. + */ +@Slf4j +public class SpecStoreChangeMonitor extends HighLevelConsumer { + protected HashSet<Long> timestampsSeenBefore; + + @Inject + protected FlowCatalog flowCatalog; + + @Inject + protected GobblinServiceJobScheduler scheduler; + + @Inject + public SpecStoreChangeMonitor(String topic, Config config, int numThreads) { + super(topic, config, numThreads); + this.timestampsSeenBefore = new HashSet(); + } + + @Override + protected void processMessage(DecodeableKafkaRecord message) { + String specUri = (String) message.getKey(); + SpecStoreChangeEvent value = (SpecStoreChangeEvent) message.getValue(); + + Long timestamp = value.getTimestamp(); + String operation = value.getOperationType().name(); + log.info("Processing message with specUri is {} timestamp is {} operation is {}", specUri, timestamp, operation); + + // If we've already processed a message with this timestamp before then skip duplicate message + if (timestampsSeenBefore.contains(timestamp)) { + return; + } + + Spec spec = this.flowCatalog.getSpecFromStore(specUri); + + // Call respective action for the type of change received + if (operation == "CREATE") { + scheduler.onAddSpec(spec); Review Comment: What will happen if we fail to schedule it? Seems we silent fail it and skip, I will suggest to emit some metrics here to indicate something went wrong so that we can monitor the behavior. ########## gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.monitoring; + +import java.util.HashSet; + +import com.google.inject.Inject; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.kafka.HighLevelConsumer; +import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; +import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler; + + +/** + * A Flow Spec Store change monitor that uses {@link SpecStoreChangeEvent} schema to process Kafka messages received + * from the consumer service. This monitor responds to changes to flow specs (creations, updates, deletes) and acts as + * a connector between the API and execution layers of GaaS. + */ +@Slf4j +public class SpecStoreChangeMonitor extends HighLevelConsumer { + protected HashSet<Long> timestampsSeenBefore; + + @Inject + protected FlowCatalog flowCatalog; + + @Inject + protected GobblinServiceJobScheduler scheduler; + + @Inject + public SpecStoreChangeMonitor(String topic, Config config, int numThreads) { + super(topic, config, numThreads); + this.timestampsSeenBefore = new HashSet(); + } + + @Override + protected void processMessage(DecodeableKafkaRecord message) { + String specUri = (String) message.getKey(); + SpecStoreChangeEvent value = (SpecStoreChangeEvent) message.getValue(); + + Long timestamp = value.getTimestamp(); + String operation = value.getOperationType().name(); + log.info("Processing message with specUri is {} timestamp is {} operation is {}", specUri, timestamp, operation); + + // If we've already processed a message with this timestamp before then skip duplicate message + if (timestampsSeenBefore.contains(timestamp)) { + return; + } + + Spec spec = this.flowCatalog.getSpecFromStore(specUri); + + // Call respective action for the type of change received + if (operation == "CREATE") { + scheduler.onAddSpec(spec); + } else if (operation == "INSERT") { + scheduler.onUpdateSpec(spec); + } else if (operation == "DELETE") { + scheduler.onDeleteSpec(spec.getUri(), spec.getVersion()); Review Comment: Same question as above, when it's delete operation, you will not have spec info in spec store ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java: ########## @@ -457,4 +458,15 @@ public void remove(URI uri, Properties headers, boolean triggerListener) { public Object getSyncObject(String specUri) { return this.specSyncObjects.getOrDefault(specUri, null); } + + public Spec getSpecFromStore(String specUri) { Review Comment: we already have this method getSpecWrapper, why do we create a new one? ########## gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.monitoring; + +import java.util.HashSet; + +import com.google.inject.Inject; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.kafka.HighLevelConsumer; +import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; +import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler; + + +/** + * A Flow Spec Store change monitor that uses {@link SpecStoreChangeEvent} schema to process Kafka messages received + * from the consumer service. This monitor responds to changes to flow specs (creations, updates, deletes) and acts as + * a connector between the API and execution layers of GaaS. + */ +@Slf4j +public class SpecStoreChangeMonitor extends HighLevelConsumer { + protected HashSet<Long> timestampsSeenBefore; + + @Inject + protected FlowCatalog flowCatalog; + + @Inject + protected GobblinServiceJobScheduler scheduler; + + @Inject + public SpecStoreChangeMonitor(String topic, Config config, int numThreads) { + super(topic, config, numThreads); + this.timestampsSeenBefore = new HashSet(); + } + + @Override + protected void processMessage(DecodeableKafkaRecord message) { + String specUri = (String) message.getKey(); + SpecStoreChangeEvent value = (SpecStoreChangeEvent) message.getValue(); + + Long timestamp = value.getTimestamp(); + String operation = value.getOperationType().name(); + log.info("Processing message with specUri is {} timestamp is {} operation is {}", specUri, timestamp, operation); + + // If we've already processed a message with this timestamp before then skip duplicate message + if (timestampsSeenBefore.contains(timestamp)) { + return; + } + + Spec spec = this.flowCatalog.getSpecFromStore(specUri); + + // Call respective action for the type of change received + if (operation == "CREATE") { + scheduler.onAddSpec(spec); + } else if (operation == "INSERT") { + scheduler.onUpdateSpec(spec); + } else if (operation == "DELETE") { + scheduler.onDeleteSpec(spec.getUri(), spec.getVersion()); + } else { + log.warn("Received unsupported change type of operation {}. Expected values to be in [CREATE, INSERT, DELETE]", operation); + return; + } + + timestampsSeenBefore.add(timestamp); Review Comment: How this process message method been called? in single thread or it can be multi thread? If it's multi thread, it's still possible that you process same message at the same time? Do we have any assumption to avoid this to happen? If so, put them in the comment ########## gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.monitoring; + +import java.util.HashSet; + +import com.google.inject.Inject; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.kafka.HighLevelConsumer; +import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; +import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler; + + +/** + * A Flow Spec Store change monitor that uses {@link SpecStoreChangeEvent} schema to process Kafka messages received + * from the consumer service. This monitor responds to changes to flow specs (creations, updates, deletes) and acts as + * a connector between the API and execution layers of GaaS. + */ +@Slf4j +public class SpecStoreChangeMonitor extends HighLevelConsumer { + protected HashSet<Long> timestampsSeenBefore; + + @Inject + protected FlowCatalog flowCatalog; + + @Inject + protected GobblinServiceJobScheduler scheduler; + + @Inject + public SpecStoreChangeMonitor(String topic, Config config, int numThreads) { + super(topic, config, numThreads); + this.timestampsSeenBefore = new HashSet(); Review Comment: I'll suggest to use cache in this case, The size of HashSet will keep increasing while the service is running, which might lead to OOM issue finally. ########## gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.monitoring; + +import java.util.HashSet; + +import com.google.inject.Inject; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.kafka.HighLevelConsumer; +import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; +import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler; + + +/** + * A Flow Spec Store change monitor that uses {@link SpecStoreChangeEvent} schema to process Kafka messages received + * from the consumer service. This monitor responds to changes to flow specs (creations, updates, deletes) and acts as + * a connector between the API and execution layers of GaaS. + */ +@Slf4j +public class SpecStoreChangeMonitor extends HighLevelConsumer { + protected HashSet<Long> timestampsSeenBefore; + + @Inject + protected FlowCatalog flowCatalog; + + @Inject + protected GobblinServiceJobScheduler scheduler; + + @Inject + public SpecStoreChangeMonitor(String topic, Config config, int numThreads) { + super(topic, config, numThreads); + this.timestampsSeenBefore = new HashSet(); + } + + @Override + protected void processMessage(DecodeableKafkaRecord message) { + String specUri = (String) message.getKey(); + SpecStoreChangeEvent value = (SpecStoreChangeEvent) message.getValue(); + + Long timestamp = value.getTimestamp(); + String operation = value.getOperationType().name(); + log.info("Processing message with specUri is {} timestamp is {} operation is {}", specUri, timestamp, operation); + + // If we've already processed a message with this timestamp before then skip duplicate message + if (timestampsSeenBefore.contains(timestamp)) { + return; + } + + Spec spec = this.flowCatalog.getSpecFromStore(specUri); + + // Call respective action for the type of change received + if (operation == "CREATE") { Review Comment: I see in schema you define those as "insert" "update" "delete", seems not correct here? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java: ########## @@ -70,6 +70,9 @@ public class GobblinServiceConfiguration { @Getter private final boolean isHelixManagerEnabled; + @Getter + private final boolean isSpecStoreChangeMonitorEnabled; Review Comment: Can we leverage the warmStandbyEnabled key? We need this feature only when we enable the warm standby feature where we use CDC stream to do message forwarding? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
