This is an automated email from the ASF dual-hosted git repository. abhishekrb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push: new 83299e98829 Miscellaneous cleanup in the supervisor API flow. (#17144) 83299e98829 is described below commit 83299e9882991f8e0d4f648c07d027c4662a807f Author: Abhishek Radhakrishnan <abhishek.r...@gmail.com> AuthorDate: Tue Sep 24 13:06:23 2024 -0700 Miscellaneous cleanup in the supervisor API flow. (#17144) Extracting a few miscellaneous non-functional changes from the batch supervisor branch: - Replace anonymous inner classes with lambda expressions in the SQL supervisor manager layer - Add explicit @Nullable annotations in DynamicConfigProviderUtils to make IDE happy - Small variable renames (copy-paste error perhaps) and fix typos - Add table name for this exception message: Delete the supervisor from the table[%s] in the database... - Prefer CollectionUtils.isEmptyOrNull() over list == null || list.size() > 0. We can change the Precondition checks to throwing DruidException separately for a batch of APIs at a time. --- .../overlord/supervisor/SupervisorResource.java | 7 +- .../druid/utils/DynamicConfigProviderUtils.java | 7 +- .../metadata/SQLMetadataSupervisorManager.java | 271 ++++++++------------- .../metadata/SQLMetadataSupervisorManagerTest.java | 15 +- 4 files changed, 110 insertions(+), 190 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index 0cf58d38512..130f617d59d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -50,6 +50,7 @@ import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.server.security.ResourceType; +import org.apache.druid.utils.CollectionUtils; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -123,7 +124,7 @@ public class SupervisorResource return asLeaderWithSupervisorManager( manager -> { Preconditions.checkArgument( - spec.getDataSources() != null && spec.getDataSources().size() > 0, + !CollectionUtils.isNullOrEmpty(spec.getDataSources()), "No dataSources found to perform authorization checks" ); final Set<ResourceAction> resourceActions; @@ -412,7 +413,7 @@ public class SupervisorResource public Response handoffTaskGroups(@PathParam("id") final String id, @Nonnull final HandoffTaskGroupsRequest handoffTaskGroupsRequest) { List<Integer> taskGroupIds = handoffTaskGroupsRequest.getTaskGroupIds(); - if (taskGroupIds == null || taskGroupIds.isEmpty()) { + if (CollectionUtils.isNullOrEmpty(taskGroupIds)) { return Response.status(Response.Status.BAD_REQUEST) .entity(ImmutableMap.of("error", "List of task groups to handoff can't be empty")) .build(); @@ -533,7 +534,7 @@ public class SupervisorResource authorizerMapper ) ); - if (authorizedHistoryForId.size() > 0) { + if (!authorizedHistoryForId.isEmpty()) { return Response.ok(authorizedHistoryForId).build(); } } diff --git a/processing/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java b/processing/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java index 0b47116f0e7..38e227987a6 100644 --- a/processing/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java +++ b/processing/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java @@ -22,13 +22,14 @@ package org.apache.druid.utils; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.metadata.DynamicConfigProvider; +import javax.annotation.Nullable; import java.util.Collections; import java.util.HashMap; import java.util.Map; public class DynamicConfigProviderUtils { - public static Map<String, String> extraConfigAndSetStringMap(Map<String, Object> config, String dynamicConfigProviderKey, ObjectMapper mapper) + public static Map<String, String> extraConfigAndSetStringMap(@Nullable Map<String, Object> config, String dynamicConfigProviderKey, ObjectMapper mapper) { HashMap<String, String> newConfig = new HashMap<>(); if (config != null) { @@ -43,7 +44,7 @@ public class DynamicConfigProviderUtils return newConfig; } - public static Map<String, Object> extraConfigAndSetObjectMap(Map<String, Object> config, String dynamicConfigProviderKey, ObjectMapper mapper) + public static Map<String, Object> extraConfigAndSetObjectMap(@Nullable Map<String, Object> config, String dynamicConfigProviderKey, ObjectMapper mapper) { HashMap<String, Object> newConfig = new HashMap<>(); if (config != null) { @@ -58,7 +59,7 @@ public class DynamicConfigProviderUtils return newConfig; } - private static Map<String, String> extraConfigFromProvider(Object dynamicConfigProviderJson, ObjectMapper mapper) + private static Map<String, String> extraConfigFromProvider(@Nullable Object dynamicConfigProviderJson, ObjectMapper mapper) { if (dynamicConfigProviderJson != null) { DynamicConfigProvider dynamicConfigProvider = mapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class); diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java index 91d71819721..5564d715792 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java @@ -20,7 +20,6 @@ package org.apache.druid.metadata; import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; @@ -38,9 +37,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.logger.Logger; import org.joda.time.DateTime; -import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.Folder3; -import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.PreparedBatch; import org.skife.jdbi.v2.StatementContext; @@ -91,24 +88,19 @@ public class SQLMetadataSupervisorManager implements MetadataSupervisorManager public void insert(final String id, final SupervisorSpec spec) { dbi.withHandle( - new HandleCallback<Void>() - { - @Override - public Void withHandle(Handle handle) throws Exception - { - handle.createStatement( - StringUtils.format( - "INSERT INTO %s (spec_id, created_date, payload) VALUES (:spec_id, :created_date, :payload)", - getSupervisorsTable() - ) - ) - .bind("spec_id", id) - .bind("created_date", DateTimes.nowUtc().toString()) - .bind("payload", jsonMapper.writeValueAsBytes(spec)) - .execute(); + handle -> { + handle.createStatement( + StringUtils.format( + "INSERT INTO %s (spec_id, created_date, payload) VALUES (:spec_id, :created_date, :payload)", + getSupervisorsTable() + ) + ) + .bind("spec_id", id) + .bind("created_date", DateTimes.nowUtc().toString()) + .bind("payload", jsonMapper.writeValueAsBytes(spec)) + .execute(); - return null; - } + return null; } ); } @@ -118,54 +110,29 @@ public class SQLMetadataSupervisorManager implements MetadataSupervisorManager { return ImmutableMap.copyOf( dbi.withHandle( - new HandleCallback<Map<String, List<VersionedSupervisorSpec>>>() - { - @Override - public Map<String, List<VersionedSupervisorSpec>> withHandle(Handle handle) - { - return handle.createQuery( - StringUtils.format( - "SELECT id, spec_id, created_date, payload FROM %1$s ORDER BY id DESC", - getSupervisorsTable() - ) - ).map( - new ResultSetMapper<Pair<String, VersionedSupervisorSpec>>() - { - @Override - public Pair<String, VersionedSupervisorSpec> map(int index, ResultSet r, StatementContext ctx) - throws SQLException - { - return Pair.of( - r.getString("spec_id"), - createVersionSupervisorSpecFromResponse(r) - ); - } - } - ).fold( - new HashMap<>(), - new Folder3<Map<String, List<VersionedSupervisorSpec>>, Pair<String, VersionedSupervisorSpec>>() - { - @Override - public Map<String, List<VersionedSupervisorSpec>> fold( - Map<String, List<VersionedSupervisorSpec>> retVal, - Pair<String, VersionedSupervisorSpec> pair, - FoldController foldController, - StatementContext statementContext - ) - { - try { - String specId = pair.lhs; - retVal.computeIfAbsent(specId, sId -> new ArrayList<>()).add(pair.rhs); - return retVal; - } - catch (Exception e) { - throw new RuntimeException(e); - } - } - } - ); - } - } + (HandleCallback<Map<String, List<VersionedSupervisorSpec>>>) handle -> handle.createQuery( + StringUtils.format( + "SELECT id, spec_id, created_date, payload FROM %1$s ORDER BY id DESC", + getSupervisorsTable() + ) + ).map( + (index, r, ctx) -> Pair.of( + r.getString("spec_id"), + createVersionSupervisorSpecFromResponse(r) + ) + ).fold( + new HashMap<>(), + (Folder3<Map<String, List<VersionedSupervisorSpec>>, Pair<String, VersionedSupervisorSpec>>) (retVal, pair, foldController, statementContext) -> { + try { + String specId = pair.lhs; + retVal.computeIfAbsent(specId, sId -> new ArrayList<>()).add(pair.rhs); + return retVal; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + ) ) ); } @@ -175,30 +142,15 @@ public class SQLMetadataSupervisorManager implements MetadataSupervisorManager { return ImmutableList.copyOf( dbi.withHandle( - new HandleCallback<List<VersionedSupervisorSpec>>() - { - @Override - public List<VersionedSupervisorSpec> withHandle(Handle handle) - { - return handle.createQuery( - StringUtils.format( - "SELECT id, spec_id, created_date, payload FROM %1$s WHERE spec_id = :spec_id ORDER BY id DESC", - getSupervisorsTable() - ) - ).bind("spec_id", id - ).map( - new ResultSetMapper<VersionedSupervisorSpec>() - { - @Override - public VersionedSupervisorSpec map(int index, ResultSet r, StatementContext ctx) - throws SQLException - { - return createVersionSupervisorSpecFromResponse(r); - } - } - ).list(); - } - } + (HandleCallback<List<VersionedSupervisorSpec>>) handle -> handle.createQuery( + StringUtils.format( + "SELECT id, spec_id, created_date, payload FROM %1$s WHERE spec_id = :spec_id ORDER BY id DESC", + getSupervisorsTable() + ) + ) + .bind("spec_id", id) + .map((index, r, ctx) -> createVersionSupervisorSpecFromResponse(r)) + .list() ) ); } @@ -207,12 +159,7 @@ public class SQLMetadataSupervisorManager implements MetadataSupervisorManager { SupervisorSpec payload; try { - payload = jsonMapper.readValue( - r.getBytes("payload"), - new TypeReference<SupervisorSpec>() - { - } - ); + payload = jsonMapper.readValue(r.getBytes("payload"), SupervisorSpec.class); } catch (JsonParseException | JsonMappingException e) { log.warn("Failed to deserialize payload for spec_id[%s]", r.getString("spec_id")); @@ -229,74 +176,54 @@ public class SQLMetadataSupervisorManager implements MetadataSupervisorManager { return ImmutableMap.copyOf( dbi.withHandle( - new HandleCallback<Map<String, SupervisorSpec>>() - { - @Override - public Map<String, SupervisorSpec> withHandle(Handle handle) - { - return handle.createQuery( - StringUtils.format( - "SELECT r.spec_id, r.payload " - + "FROM %1$s r " - + "INNER JOIN(SELECT spec_id, max(id) as id FROM %1$s GROUP BY spec_id) latest " - + "ON r.id = latest.id", - getSupervisorsTable() - ) - ).map( - new ResultSetMapper<Pair<String, SupervisorSpec>>() - { - @Nullable - @Override - public Pair<String, SupervisorSpec> map(int index, ResultSet r, StatementContext ctx) - throws SQLException - { - try { - return Pair.of( - r.getString("spec_id"), - jsonMapper.readValue( - r.getBytes("payload"), new TypeReference<SupervisorSpec>() - { - } - ) - ); - } - catch (IOException e) { - String exceptionMessage = StringUtils.format( - "Could not map json payload to a SupervisorSpec for spec_id: [%s]." - + " Delete the supervisor from the database and re-submit it to the overlord.", - r.getString("spec_id") - ); - log.error(e, exceptionMessage); - return null; - } - } + (HandleCallback<Map<String, SupervisorSpec>>) handle -> handle.createQuery( + StringUtils.format( + "SELECT r.spec_id, r.payload " + + "FROM %1$s r " + + "INNER JOIN(SELECT spec_id, max(id) as id FROM %1$s GROUP BY spec_id) latest " + + "ON r.id = latest.id", + getSupervisorsTable() + ) + ).map( + new ResultSetMapper<Pair<String, SupervisorSpec>>() + { + @Nullable + @Override + public Pair<String, SupervisorSpec> map(int index, ResultSet r, StatementContext ctx) + throws SQLException + { + try { + return Pair.of( + r.getString("spec_id"), + jsonMapper.readValue(r.getBytes("payload"), SupervisorSpec.class) + ); + } + catch (IOException e) { + String exceptionMessage = StringUtils.format( + "Could not map json payload to a SupervisorSpec for spec_id: [%s]." + + " Delete the supervisor from the table[%s] in the database and re-submit it to the overlord.", + r.getString("spec_id"), + getSupervisorsTable() + ); + log.error(e, exceptionMessage); + return null; } - ).fold( - new HashMap<>(), - new Folder3<Map<String, SupervisorSpec>, Pair<String, SupervisorSpec>>() - { - @Override - public Map<String, SupervisorSpec> fold( - Map<String, SupervisorSpec> retVal, - Pair<String, SupervisorSpec> stringObjectMap, - FoldController foldController, - StatementContext statementContext - ) - { - try { - if (null != stringObjectMap) { - retVal.put(stringObjectMap.lhs, stringObjectMap.rhs); - } - return retVal; - } - catch (Exception e) { - throw new RuntimeException(e); - } - } + } + } + ).fold( + new HashMap<>(), + (Folder3<Map<String, SupervisorSpec>, Pair<String, SupervisorSpec>>) (retVal, stringObjectMap, foldController, statementContext) -> { + try { + if (null != stringObjectMap) { + retVal.put(stringObjectMap.lhs, stringObjectMap.rhs); } - ); - } - } + return retVal; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + ) ) ); } @@ -304,10 +231,10 @@ public class SQLMetadataSupervisorManager implements MetadataSupervisorManager @Override public Map<String, SupervisorSpec> getLatestActiveOnly() { - Map<String, SupervisorSpec> supervisors = getLatest(); - Map<String, SupervisorSpec> activeSupervisors = new HashMap<>(); + final Map<String, SupervisorSpec> supervisors = getLatest(); + final Map<String, SupervisorSpec> activeSupervisors = new HashMap<>(); for (Map.Entry<String, SupervisorSpec> entry : supervisors.entrySet()) { - // Terminated supervisor will have it's latest supervisorSpec as NoopSupervisorSpec + // Terminated supervisor will have its latest supervisorSpec as NoopSupervisorSpec // (NoopSupervisorSpec is used as a tombstone marker) if (!(entry.getValue() instanceof NoopSupervisorSpec)) { activeSupervisors.put(entry.getKey(), entry.getValue()); @@ -319,16 +246,16 @@ public class SQLMetadataSupervisorManager implements MetadataSupervisorManager @Override public Map<String, SupervisorSpec> getLatestTerminatedOnly() { - Map<String, SupervisorSpec> supervisors = getLatest(); - Map<String, SupervisorSpec> activeSupervisors = new HashMap<>(); + final Map<String, SupervisorSpec> supervisors = getLatest(); + final Map<String, SupervisorSpec> terminatedSupervisors = new HashMap<>(); for (Map.Entry<String, SupervisorSpec> entry : supervisors.entrySet()) { - // Terminated supervisor will have it's latest supervisorSpec as NoopSupervisorSpec + // Terminated supervisor will have its latest supervisorSpec as NoopSupervisorSpec // (NoopSupervisorSpec is used as a tombstone marker) if (entry.getValue() instanceof NoopSupervisorSpec) { - activeSupervisors.put(entry.getKey(), entry.getValue()); + terminatedSupervisors.put(entry.getKey(), entry.getValue()); } } - return ImmutableMap.copyOf(activeSupervisors); + return ImmutableMap.copyOf(terminatedSupervisors); } @Override diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java index b43958fc746..ae931697476 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java @@ -37,8 +37,6 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; -import org.skife.jdbi.v2.Handle; -import org.skife.jdbi.v2.tweak.HandleCallback; import java.util.Collections; import java.util.List; @@ -65,16 +63,9 @@ public class SQLMetadataSupervisorManagerTest public void cleanup() { connector.getDBI().withHandle( - new HandleCallback<Void>() - { - @Override - public Void withHandle(Handle handle) - { - handle.createStatement(StringUtils.format("DROP TABLE %s", tablesConfig.getSupervisorTable())) - .execute(); - return null; - } - } + handle -> handle.createStatement(StringUtils.format("DROP TABLE %s", tablesConfig.getSupervisorTable())) + .execute() + ); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org