kfaraz commented on code in PR #18341:
URL: https://github.com/apache/druid/pull/18341#discussion_r2258853857
##########
processing/src/main/java/org/apache/druid/guice/Binders.java:
##########
@@ -59,4 +60,11 @@ public static MapBinder<String, TaskLogs>
taskLogsBinder(Binder binder)
{
return PolyBind.optionBinder(binder, Key.get(TaskLogs.class));
}
+
+ public static <T extends TaskLogs> void bindTaskLogs(Binder binder, String
type, Class<T> clazz)
Review Comment:
Please add a short javadoc to this.
##########
processing/src/test/java/org/apache/druid/guice/BindersTest.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.name.Names;
+import org.apache.druid.tasklogs.NoopTaskLogs;
+import org.apache.druid.tasklogs.TaskLogs;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Properties;
+
+class BindersTest
Review Comment:
Thanks for adding this!
##########
processing/src/main/java/org/apache/druid/tasklogs/TaskLogPusher.java:
##########
@@ -39,4 +39,9 @@ default void pushTaskReports(String taskid, File reportFile)
throws IOException
default void pushTaskStatus(String taskid, File reportFile) throws
IOException
{
}
+
+ default boolean logPushEnabled()
Review Comment:
It seems weird to have a flag to disable the one thing that this interface
is meant to do.
Given that we already have a `noop` impl, where do we need this?
##########
extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java:
##########
@@ -81,12 +85,16 @@
public class KubernetesOverlordModule implements DruidModule
{
- private static final Logger log = new Logger(KubernetesOverlordModule.class);
+ private static final Logger log = new
EmittingLogger(KubernetesOverlordModule.class);
Review Comment:
Nit: Why is this change required?
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/SwitchingTaskLogs.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.tasklogs;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+import org.apache.druid.tasklogs.TaskLogs;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class SwitchingTaskLogs implements TaskLogs
+{
+ private final TaskLogs reportTaskLogs;
+ private final TaskLogs logStreamer;
+ private final TaskLogs logPusher;
+
+ @Inject
+ public SwitchingTaskLogs(
+ @Named("reports") TaskLogs reportTaskLogs,
Review Comment:
There should be an argument for the default type as well.
The other args can be treated as optional bindings.
If they turn out to be not bound (i.e. null), `SwitchingTaskLogs` uses the
default one for them.
##########
services/src/main/java/org/apache/druid/cli/CliOverlord.java:
##########
@@ -464,6 +455,38 @@ private void configureOverlordHelpers(Binder binder)
);
}
+ private void bindTaskLogStreamer(Binder binder)
+ {
+ String logsType = properties.getProperty("druid.indexer.logs.type");
+ if ("switching".equals(logsType)) {
+ bindSwitchingTaskLogs(binder);
+ } else {
+ bindSwitchingTaskLogStreamer(binder);
+ }
+ }
+
+ private static void bindSwitchingTaskLogStreamer(Binder binder)
+ {
+ binder.bind(TaskLogStreamer.class)
+ .to(SwitchingTaskLogStreamer.class)
+ .in(LazySingleton.class);
+ binder.bind(new TypeLiteral<List<TaskLogStreamer>>() {})
+ .toProvider(new ListProvider<TaskLogStreamer>().add(TaskLogs.class))
+ .in(LazySingleton.class);
+
+ binder.bind(TaskLogStreamer.class)
+ .annotatedWith(Names.named("taskstreamer"))
+ .to(TaskRunnerTaskLogStreamer.class)
+ .in(LazySingleton.class);
+ }
+
+ private static void bindSwitchingTaskLogs(Binder binder)
Review Comment:
The contents of this method can be inlined.
##########
processing/src/main/java/org/apache/druid/guice/Binders.java:
##########
@@ -59,4 +60,11 @@ public static MapBinder<String, TaskLogs>
taskLogsBinder(Binder binder)
{
return PolyBind.optionBinder(binder, Key.get(TaskLogs.class));
}
+
+ public static <T extends TaskLogs> void bindTaskLogs(Binder binder, String
type, Class<T> clazz)
+ {
+ PolyBind.optionBinder(binder,
Key.get(TaskLogs.class)).addBinding(type).to(clazz);
+ PolyBind.optionBinder(binder, Key.get(TaskLogs.class,
Names.named("defaultType"))).addBinding(type).to(clazz);
Review Comment:
"defaultType" should be a constant in `TaskLogs` or `SwitchingTaskLogs`.
Use a more qualified name like `switching.defaultType` or something.
##########
extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java:
##########
@@ -290,16 +298,83 @@ public RunnerStrategy get()
private void configureTaskLogs(Binder binder)
{
PolyBind.createChoice(binder, "druid.indexer.logs.type",
Key.get(TaskLogs.class), Key.get(FileTaskLogs.class));
+ PolyBind.createChoice(binder, PROPERTY_PREFIX_SWITCHING + ".defaultType",
Key.get(TaskLogs.class, Names.named("defaultType")),
Key.get(FileTaskLogs.class));
+
JsonConfigProvider.bind(binder, "druid.indexer.logs",
FileTaskLogsConfig.class);
final MapBinder<String, TaskLogs> taskLogBinder =
Binders.taskLogsBinder(binder);
-
taskLogBinder.addBinding("noop").to(NoopTaskLogs.class).in(LazySingleton.class);
-
taskLogBinder.addBinding("file").to(FileTaskLogs.class).in(LazySingleton.class);
+
+ taskLogBinder.addBinding("switching").to(SwitchingTaskLogs.class);
+ Binders.bindTaskLogs(binder, "noop", NoopTaskLogs.class);
+ Binders.bindTaskLogs(binder, "file", FileTaskLogs.class);
+
binder.bind(NoopTaskLogs.class).in(LazySingleton.class);
binder.bind(FileTaskLogs.class).in(LazySingleton.class);
+ binder.bind(SwitchingTaskLogs.class).in(LazySingleton.class);
binder.bind(TaskLogPusher.class).to(TaskLogs.class);
binder.bind(TaskLogKiller.class).to(TaskLogs.class);
+ binder.bind(TaskPayloadManager.class).to(TaskLogs.class);
+ }
+
+ @Provides
+ @Named("streamer")
Review Comment:
Please move these constants to `SwitchingTaskLogs` and use more consistent
names like `switching.logStreamType`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java:
##########
@@ -792,7 +792,7 @@ public Response doGetLog(
return Response.status(Response.Status.NOT_FOUND)
.entity(
"No log was found for this task. "
- + "The task may not exist, or it may not have begun
running yet."
+ + "The task may not exist, it may not have begun
running yet or the taskStream configuration may be faulty, kindly verify the
same."
Review Comment:
```suggestion
+ "No logs found for this task. Ensure that the
task is running and logging is configured correctly."
```
##########
extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java:
##########
@@ -81,12 +85,16 @@
public class KubernetesOverlordModule implements DruidModule
{
- private static final Logger log = new Logger(KubernetesOverlordModule.class);
+ private static final Logger log = new
EmittingLogger(KubernetesOverlordModule.class);
private static final String K8SANDWORKER_PROPERTIES_PREFIX =
IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX
+
".k8sAndWorker";
private static final String RUNNERSTRATEGY_PROPERTIES_FORMAT_STRING =
K8SANDWORKER_PROPERTIES_PREFIX
+
".runnerStrategy.%s";
private static final String HTTPCLIENT_PROPERITES_PREFIX =
K8SANDWORKER_PROPERTIES_PREFIX + ".http";
+ private static final String PROPERTY_PREFIX_SWITCHING =
"druid.indexer.logs.switching";
+ private static final String PROPERTY_KEY_SWITCHING_PUSH_TYPE =
PROPERTY_PREFIX_SWITCHING + ".pushType";
+ private static final String PROPERTY_KEY_SWITCHING_STREAM_TYPE =
PROPERTY_PREFIX_SWITCHING + ".streamType";
+ private static final String PROPERTY_KEY_SWITCHING_REPORTS_TYPE =
PROPERTY_PREFIX_SWITCHING + ".reportsType";
Review Comment:
These constants should exist in `SwitchingTaskLogs`.
##########
processing/src/main/java/org/apache/druid/guice/Binders.java:
##########
@@ -59,4 +60,11 @@ public static MapBinder<String, TaskLogs>
taskLogsBinder(Binder binder)
{
return PolyBind.optionBinder(binder, Key.get(TaskLogs.class));
}
+
+ public static <T extends TaskLogs> void bindTaskLogs(Binder binder, String
type, Class<T> clazz)
+ {
+ PolyBind.optionBinder(binder,
Key.get(TaskLogs.class)).addBinding(type).to(clazz);
+ PolyBind.optionBinder(binder, Key.get(TaskLogs.class,
Names.named("defaultType"))).addBinding(type).to(clazz);
+ binder.bind(Key.get(TaskLogs.class, Names.named(type))).to(clazz);
Review Comment:
Where would we need to refer to a `TaskLogs` object by the name of its
implementation?
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/config/FileTaskLogsConfig.java:
##########
@@ -19,11 +19,13 @@
package org.apache.druid.indexing.common.config;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.NotNull;
import java.io.File;
+@JsonIgnoreProperties(ignoreUnknown = true)
Review Comment:
Did you run into any issue while deserializing?
Unknown properties should already be ignored by default.
##########
extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java:
##########
@@ -290,16 +298,83 @@ public RunnerStrategy get()
private void configureTaskLogs(Binder binder)
{
PolyBind.createChoice(binder, "druid.indexer.logs.type",
Key.get(TaskLogs.class), Key.get(FileTaskLogs.class));
+ PolyBind.createChoice(binder, PROPERTY_PREFIX_SWITCHING + ".defaultType",
Key.get(TaskLogs.class, Names.named("defaultType")),
Key.get(FileTaskLogs.class));
+
JsonConfigProvider.bind(binder, "druid.indexer.logs",
FileTaskLogsConfig.class);
final MapBinder<String, TaskLogs> taskLogBinder =
Binders.taskLogsBinder(binder);
-
taskLogBinder.addBinding("noop").to(NoopTaskLogs.class).in(LazySingleton.class);
-
taskLogBinder.addBinding("file").to(FileTaskLogs.class).in(LazySingleton.class);
+
+ taskLogBinder.addBinding("switching").to(SwitchingTaskLogs.class);
+ Binders.bindTaskLogs(binder, "noop", NoopTaskLogs.class);
+ Binders.bindTaskLogs(binder, "file", FileTaskLogs.class);
+
binder.bind(NoopTaskLogs.class).in(LazySingleton.class);
binder.bind(FileTaskLogs.class).in(LazySingleton.class);
+ binder.bind(SwitchingTaskLogs.class).in(LazySingleton.class);
binder.bind(TaskLogPusher.class).to(TaskLogs.class);
binder.bind(TaskLogKiller.class).to(TaskLogs.class);
+ binder.bind(TaskPayloadManager.class).to(TaskLogs.class);
Review Comment:
I wonder if these bindings are really needed here. Doesn't the other module
`IndexingServiceTaskLogsModule` already perform the same bindings on the
Overlord?
##########
extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java:
##########
@@ -290,16 +298,83 @@ public RunnerStrategy get()
private void configureTaskLogs(Binder binder)
{
PolyBind.createChoice(binder, "druid.indexer.logs.type",
Key.get(TaskLogs.class), Key.get(FileTaskLogs.class));
+ PolyBind.createChoice(binder, PROPERTY_PREFIX_SWITCHING + ".defaultType",
Key.get(TaskLogs.class, Names.named("defaultType")),
Key.get(FileTaskLogs.class));
+
JsonConfigProvider.bind(binder, "druid.indexer.logs",
FileTaskLogsConfig.class);
final MapBinder<String, TaskLogs> taskLogBinder =
Binders.taskLogsBinder(binder);
-
taskLogBinder.addBinding("noop").to(NoopTaskLogs.class).in(LazySingleton.class);
-
taskLogBinder.addBinding("file").to(FileTaskLogs.class).in(LazySingleton.class);
+
+ taskLogBinder.addBinding("switching").to(SwitchingTaskLogs.class);
+ Binders.bindTaskLogs(binder, "noop", NoopTaskLogs.class);
+ Binders.bindTaskLogs(binder, "file", FileTaskLogs.class);
+
binder.bind(NoopTaskLogs.class).in(LazySingleton.class);
binder.bind(FileTaskLogs.class).in(LazySingleton.class);
+ binder.bind(SwitchingTaskLogs.class).in(LazySingleton.class);
binder.bind(TaskLogPusher.class).to(TaskLogs.class);
binder.bind(TaskLogKiller.class).to(TaskLogs.class);
+ binder.bind(TaskPayloadManager.class).to(TaskLogs.class);
+ }
+
+ @Provides
+ @Named("streamer")
+ public TaskLogs provideStreamer(
Review Comment:
This logic shouldn't be needed here, `SwitchingTaskLogs` should handle it
implicitly.
Same for the other methods.
##########
services/src/main/java/org/apache/druid/cli/CliOverlord.java:
##########
@@ -464,6 +455,38 @@ private void configureOverlordHelpers(Binder binder)
);
}
+ private void bindTaskLogStreamer(Binder binder)
+ {
+ String logsType = properties.getProperty("druid.indexer.logs.type");
+ if ("switching".equals(logsType)) {
+ bindSwitchingTaskLogs(binder);
+ } else {
+ bindSwitchingTaskLogStreamer(binder);
+ }
+ }
+
+ private static void bindSwitchingTaskLogStreamer(Binder binder)
Review Comment:
This method can be inlined too since the calling method itself is small
enough.
##########
services/src/main/java/org/apache/druid/cli/CliOverlord.java:
##########
@@ -464,6 +455,38 @@ private void configureOverlordHelpers(Binder binder)
);
}
+ private void bindTaskLogStreamer(Binder binder)
+ {
+ String logsType = properties.getProperty("druid.indexer.logs.type");
+ if ("switching".equals(logsType)) {
+ bindSwitchingTaskLogs(binder);
Review Comment:
I am not entirely sure if we should be doing this.
For the Overlord, `SwitchingTaskLogs` should still be treated as one of the
multiple bound log streamers that the `Overlord` would look at when asked for
the logs of a task.
Maybe I am missing some use cases. Could you please explain this change?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]