zhtaoxiang commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1035630266
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java:
##########
@@ -97,6 +97,49 @@ public static URI generateSegmentTarURI(URI dirURI, URI
fileURI, String prefix,
public static void pushSegments(SegmentGenerationJobSpec spec, PinotFS
fileSystem, List<String> tarFilePaths)
throws RetriableOperationException, AttemptsExceededException {
String tableName = spec.getTableSpec().getTableName();
+ AuthProvider authProvider =
AuthProviderUtils.makeAuthProvider(spec.getAuthToken());
Review Comment:
just curious: why do we need to create those 3 new methods? It seems that
there is no logic change?
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -276,6 +299,98 @@ public List<SegmentConversionResult>
executeTask(PinotTaskConfig pinotTaskConfig
}
}
+ private void pushSegment(String tableName, Map<String, String> taskConfigs,
URI outputSegmentTarURI,
+ List<Header> headers, List<NameValuePair> parameters) throws Exception {
+ String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE);
+ LOGGER.info("Trying to push Pinot segment with push mode {} from {}",
pushMode, outputSegmentTarURI);
+
+ PushJobSpec pushJobSpec = new PushJobSpec();
+ pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
+ pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
+ pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
+
pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
+
pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+
+ SegmentGenerationJobSpec spec = generatePushJobSpec(tableName,
taskConfigs, pushJobSpec);
+
+ URI outputSegmentDirURI = null;
+ if (taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI))
{
+ outputSegmentDirURI =
URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+ }
+ try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs,
outputSegmentDirURI)) {
+ switch
(BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+ case TAR:
+ try (PinotFS pinotFS = MinionTaskUtils.getLocalPinotFs()) {
+ SegmentPushUtils.pushSegments(
+ spec, pinotFS, Arrays.asList(outputSegmentTarURI.toString()),
headers, parameters);
+ } catch (RetriableOperationException | AttemptsExceededException e) {
+ throw new RuntimeException(e);
+ }
+ break;
+ case URI:
Review Comment:
I feel that we may not allow URI mode, it does not make too much sense to
use URI mode in minion tasks. WDYT?
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java:
##########
@@ -82,6 +84,8 @@ public class RealtimeToOfflineSegmentsTaskGenerator extends
BaseTaskGenerator {
private static final String DEFAULT_BUCKET_PERIOD = "1d";
private static final String DEFAULT_BUFFER_PERIOD = "2d";
+ private static final BatchConfigProperties.SegmentPushType
DEFAULT_SEGMENT_PUSH_TYPE =
Review Comment:
It seems that this variable is not used?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]