This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d2d74d1a01 Decouple dependency on using peer segment download scheme 
for doing metadata pushes to controller (#10136)
d2d74d1a01 is described below

commit d2d74d1a0187ff11b73bb2c3252550a68010960b
Author: Navina Ramesh <[email protected]>
AuthorDate: Fri Jan 20 17:28:20 2023 -0800

    Decouple dependency on using peer segment download scheme for doing 
metadata pushes to controller (#10136)
---
 .../realtime/PeerSchemeSplitSegmentCommitter.java  |  48 --------
 .../manager/realtime/SegmentCommitterFactory.java  |  34 ++++--
 .../manager/realtime/SplitSegmentCommitter.java    |  23 +++-
 .../realtime/SegmentCommitterFactoryTest.java      | 124 +++++++++++++++++++++
 .../org/apache/pinot/spi/stream/StreamConfig.java  |  20 +++-
 .../pinot/spi/stream/StreamConfigProperties.java   |   5 +
 6 files changed, 194 insertions(+), 60 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PeerSchemeSplitSegmentCommitter.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PeerSchemeSplitSegmentCommitter.java
deleted file mode 100644
index 70d9de86d7..0000000000
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PeerSchemeSplitSegmentCommitter.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.data.manager.realtime;
-
-import java.io.File;
-import java.net.URI;
-import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
-import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.StringUtil;
-import org.slf4j.Logger;
-
-
-public class PeerSchemeSplitSegmentCommitter extends SplitSegmentCommitter {
-  public PeerSchemeSplitSegmentCommitter(Logger segmentLogger, 
ServerSegmentCompletionProtocolHandler protocolHandler,
-      SegmentCompletionProtocol.Request.Params params, SegmentUploader 
segmentUploader) {
-    super(segmentLogger, protocolHandler, params, segmentUploader);
-  }
-
-  // Always return a uri string even if the segment upload fails and returns a 
null uri.
-  // If the segment upload fails, put peer:///segment_name in the segment 
location to notify the controller it is a
-  // peer download scheme.
-  protected String uploadSegment(File segmentTarFile, SegmentUploader 
segmentUploader,
-      SegmentCompletionProtocol.Request.Params params) {
-    URI segmentLocation = segmentUploader.uploadSegment(segmentTarFile, new 
LLCSegmentName(params.getSegmentName()));
-    if (segmentLocation == null) {
-      return StringUtil.join("/", 
CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME, params.getSegmentName());
-    }
-    return segmentLocation.toString();
-  }
-}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
index 30e4d1d92f..5a52103efc 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
@@ -24,6 +24,8 @@ import 
org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.slf4j.Logger;
 
 
@@ -34,6 +36,7 @@ public class SegmentCommitterFactory {
   private static Logger _logger;
   private final ServerSegmentCompletionProtocolHandler _protocolHandler;
   private final TableConfig _tableConfig;
+  private final StreamConfig _streamConfig;
   private final ServerMetrics _serverMetrics;
   private final IndexLoadingConfig _indexLoadingConfig;
 
@@ -42,10 +45,20 @@ public class SegmentCommitterFactory {
     _logger = segmentLogger;
     _protocolHandler = protocolHandler;
     _tableConfig = tableConfig;
+    _streamConfig = new StreamConfig(_tableConfig.getTableName(),
+        IngestionConfigUtils.getStreamConfigMap(_tableConfig));
     _indexLoadingConfig = indexLoadingConfig;
     _serverMetrics = serverMetrics;
   }
 
+  /**
+   *
+   * @param isSplitCommit Indicates if the controller has enabled split commit
+   * @param params Parameters to use in the Segment completion request
+   * @param controllerVipUrl Unused,
+   * @return
+   * @throws URISyntaxException
+   */
   public SegmentCommitter createSegmentCommitter(boolean isSplitCommit, 
SegmentCompletionProtocol.Request.Params params,
       String controllerVipUrl)
       throws URISyntaxException {
@@ -53,18 +66,21 @@ public class SegmentCommitterFactory {
       return new DefaultSegmentCommitter(_logger, _protocolHandler, params);
     }
     SegmentUploader segmentUploader;
-    // TODO Instead of using a peer segment download scheme to control how the 
servers do split commit, we should use
-    // other configs such as server or controller configs or controller 
responses to the servers.
-    if (_tableConfig.getValidationConfig().getPeerSegmentDownloadScheme() != 
null) {
+
+    boolean uploadToFs = _streamConfig.isServerUploadToDeepStore();
+    String peerSegmentDownloadScheme = 
_tableConfig.getValidationConfig().getPeerSegmentDownloadScheme();
+    // TODO: exists for backwards compatibility. remove peerDownloadScheme 
non-null check once users have migrated
+    if (uploadToFs || peerSegmentDownloadScheme != null) {
       segmentUploader = new 
PinotFSSegmentUploader(_indexLoadingConfig.getSegmentStoreURI(),
           PinotFSSegmentUploader.DEFAULT_SEGMENT_UPLOAD_TIMEOUT_MILLIS);
-      return new PeerSchemeSplitSegmentCommitter(_logger, _protocolHandler, 
params, segmentUploader);
+    } else {
+      segmentUploader = new Server2ControllerSegmentUploader(_logger,
+          _protocolHandler.getFileUploadDownloadClient(),
+          _protocolHandler.getSegmentCommitUploadURL(params, 
controllerVipUrl), params.getSegmentName(),
+          
ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), 
_serverMetrics,
+          _protocolHandler.getAuthProvider());
     }
 
-    segmentUploader = new Server2ControllerSegmentUploader(_logger, 
_protocolHandler.getFileUploadDownloadClient(),
-        _protocolHandler.getSegmentCommitUploadURL(params, controllerVipUrl), 
params.getSegmentName(),
-        
ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), 
_serverMetrics,
-        _protocolHandler.getAuthProvider());
-    return new SplitSegmentCommitter(_logger, _protocolHandler, params, 
segmentUploader);
+    return new SplitSegmentCommitter(_logger, _protocolHandler, params, 
segmentUploader, peerSegmentDownloadScheme);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
index 33b2ac0093..7f3d3ea1fb 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
@@ -18,11 +18,15 @@
  */
 package org.apache.pinot.core.data.manager.realtime;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.File;
 import java.net.URI;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.StringUtil;
 import org.slf4j.Logger;
 
 
@@ -34,14 +38,27 @@ public class SplitSegmentCommitter implements 
SegmentCommitter {
   private final SegmentCompletionProtocol.Request.Params _params;
   private final ServerSegmentCompletionProtocolHandler _protocolHandler;
   private final SegmentUploader _segmentUploader;
+  private final String _peerDownloadScheme;
   private final Logger _segmentLogger;
 
   public SplitSegmentCommitter(Logger segmentLogger, 
ServerSegmentCompletionProtocolHandler protocolHandler,
-      SegmentCompletionProtocol.Request.Params params, SegmentUploader 
segmentUploader) {
+      SegmentCompletionProtocol.Request.Params params, SegmentUploader 
segmentUploader,
+      @Nullable String peerDownloadScheme) {
     _segmentLogger = segmentLogger;
     _protocolHandler = protocolHandler;
     _params = new SegmentCompletionProtocol.Request.Params(params);
     _segmentUploader = segmentUploader;
+    _peerDownloadScheme = peerDownloadScheme;
+  }
+
+  @VisibleForTesting
+  SegmentUploader getSegmentUploader() {
+    return _segmentUploader;
+  }
+
+  public SplitSegmentCommitter(Logger segmentLogger, 
ServerSegmentCompletionProtocolHandler protocolHandler,
+      SegmentCompletionProtocol.Request.Params params, SegmentUploader 
segmentUploader) {
+    this(segmentLogger, protocolHandler, params, segmentUploader, null);
   }
 
   @Override
@@ -79,6 +96,10 @@ public class SplitSegmentCommitter implements 
SegmentCommitter {
     if (segmentLocation != null) {
       return segmentLocation.toString();
     }
+    if (_peerDownloadScheme != null) {
+        return StringUtil.join("/", 
CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME,
+            params.getSegmentName());
+    }
     return null;
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java
new file mode 100644
index 0000000000..c815d48a94
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.data.manager.realtime;
+
+import com.google.common.collect.ImmutableMap;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class SegmentCommitterFactoryTest {
+
+  private Map<String, String> getMinimumStreamConfigMap() {
+    return ImmutableMap.of(
+        "streamType", "kafka",
+        "stream.kafka.consumer.type", "simple",
+        "stream.kafka.topic.name", "ignore",
+        "stream.kafka.decoder.class.name", 
"org.apache.pinot.plugin.inputformat.json.JsonMessageDecoder");
+  }
+
+  private TableConfigBuilder createRealtimeTableConfig(String tableName) {
+    return createRealtimeTableConfig(tableName, getMinimumStreamConfigMap());
+  }
+
+  private TableConfigBuilder createRealtimeTableConfig(String tableName, 
Map<String, String> realtimeStreamConfig) {
+    return new TableConfigBuilder(TableType.REALTIME)
+        .setTableName(tableName)
+        .setLLC(true)
+        .setStreamConfigs(realtimeStreamConfig);
+  }
+
+  @Test (description = "if controller doesn't support split commit, it should 
return default segment committer")
+  public void testControllerNoSplit()
+      throws URISyntaxException {
+    TableConfig config = createRealtimeTableConfig("test").build();
+
+    SegmentCommitterFactory factory = new 
SegmentCommitterFactory(Mockito.mock(Logger.class),
+        Mockito.mock(ServerSegmentCompletionProtocolHandler.class), config,
+        Mockito.mock(IndexLoadingConfig.class), 
Mockito.mock(ServerMetrics.class));
+    SegmentCommitter committer = factory.createSegmentCommitter(false, null, 
null);
+    Assert.assertNotNull(committer);
+    Assert.assertTrue(committer instanceof DefaultSegmentCommitter);
+  }
+
+  @Test(description = "when controller supports split commit, server should 
always use split segment commit")
+  public void testSplitSegmentCommitterIsDefault()
+      throws URISyntaxException {
+    TableConfig config = createRealtimeTableConfig("test").build();
+    ServerSegmentCompletionProtocolHandler protocolHandler =
+        new 
ServerSegmentCompletionProtocolHandler(Mockito.mock(ServerMetrics.class), 
"test_REALTIME");
+    String controllerVipUrl = "http://localhost:1234";;
+    SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
+    SegmentCommitterFactory factory = new 
SegmentCommitterFactory(Mockito.mock(Logger.class), protocolHandler, config,
+        Mockito.mock(IndexLoadingConfig.class), 
Mockito.mock(ServerMetrics.class));
+    SegmentCommitter committer = factory.createSegmentCommitter(true, 
requestParams, controllerVipUrl);
+    Assert.assertNotNull(committer);
+    Assert.assertTrue(committer instanceof SplitSegmentCommitter);
+  }
+
+  @Test(description = "use upload to deepstore when either 
serverUploadToDeepStore is set or peer segment download "
+      + "scheme is non-null")
+  public void testUploadToDeepStoreConfig()
+      throws URISyntaxException {
+    ServerSegmentCompletionProtocolHandler protocolHandler =
+        new 
ServerSegmentCompletionProtocolHandler(Mockito.mock(ServerMetrics.class), 
"test_REALTIME");
+    String controllerVipUrl = "http://localhost:1234";;
+    SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
+
+    // No peer segment download scheme, serverUploadToDeepStore = true
+    Map<String, String> streamConfigMap = new 
HashMap<>(getMinimumStreamConfigMap());
+    streamConfigMap.put(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE, 
"true");
+    TableConfig config = createRealtimeTableConfig("testDeepStoreConfig", 
streamConfigMap).build();
+
+    SegmentCommitterFactory factory = new 
SegmentCommitterFactory(Mockito.mock(Logger.class), protocolHandler, config,
+        Mockito.mock(IndexLoadingConfig.class), 
Mockito.mock(ServerMetrics.class));
+    SegmentCommitter committer = factory.createSegmentCommitter(true, 
requestParams, controllerVipUrl);
+    Assert.assertNotNull(committer);
+    Assert.assertTrue(committer instanceof SplitSegmentCommitter);
+    Assert.assertTrue(((SplitSegmentCommitter) committer).getSegmentUploader() 
instanceof PinotFSSegmentUploader);
+
+    // Peer segment download scheme is set, serverUploadToDeepStore = false 
(for backwards compatibility)
+    Map<String, String> streamConfigMap1 = new 
HashMap<>(getMinimumStreamConfigMap());
+    streamConfigMap1.put(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE, 
"false");
+    TableConfig config1 = createRealtimeTableConfig("testDeepStoreConfig", 
streamConfigMap1)
+        .setPeerSegmentDownloadScheme("http")
+        .build();
+
+    factory = new SegmentCommitterFactory(Mockito.mock(Logger.class), 
protocolHandler, config1,
+        Mockito.mock(IndexLoadingConfig.class), 
Mockito.mock(ServerMetrics.class));
+    committer = factory.createSegmentCommitter(true, requestParams, 
controllerVipUrl);
+    Assert.assertNotNull(committer);
+    Assert.assertTrue(committer instanceof SplitSegmentCommitter);
+    Assert.assertTrue(((SplitSegmentCommitter) committer).getSegmentUploader() 
instanceof PinotFSSegmentUploader);
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
index a2249dec4d..66c40636ec 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
@@ -50,6 +50,7 @@ public class StreamConfig {
   public static final long DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS = 
TimeUnit.MILLISECONDS.convert(6, TimeUnit.HOURS);
   public static final long DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES = 200 * 
1024 * 1024; // 200M
   public static final int DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS = 100_000;
+  public static final String DEFAULT_SERVER_UPLOAD_TO_DEEPSTORE = "false";
 
   public static final String DEFAULT_CONSUMER_FACTORY_CLASS_NAME_STRING =
       "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory";
@@ -89,6 +90,11 @@ public class StreamConfig {
   // Allow overriding it to use different offset criteria
   private OffsetCriteria _offsetCriteria;
 
+  // Indicates if the segment should be uploaded to the deep store's file 
system or to the controller during the
+  // segment commit protocol. By default, segment is uploaded to the 
controller during commit.
+  // If this flag is set to true, the segment is uploaded to deep store.
+  private final boolean _serverUploadToDeepStore;
+
   /**
    * Initializes a StreamConfig using the map of stream configs from the table 
config
    */
@@ -191,6 +197,9 @@ public class StreamConfig {
     _flushThresholdRows = extractFlushThresholdRows(streamConfigMap);
     _flushThresholdTimeMillis = 
extractFlushThresholdTimeMillis(streamConfigMap);
     _flushThresholdSegmentSizeBytes = 
extractFlushThresholdSegmentSize(streamConfigMap);
+    _serverUploadToDeepStore = Boolean.parseBoolean(
+        
streamConfigMap.getOrDefault(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE,
+        DEFAULT_SERVER_UPLOAD_TO_DEEPSTORE));
 
     int autotuneInitialRows = 0;
     String initialRowsValue = 
streamConfigMap.get(StreamConfigProperties.SEGMENT_FLUSH_AUTOTUNE_INITIAL_ROWS);
@@ -214,6 +223,10 @@ public class StreamConfig {
     _streamConfigMap.putAll(streamConfigMap);
   }
 
+  public boolean isServerUploadToDeepStore() {
+    return _serverUploadToDeepStore;
+  }
+
   private long extractFlushThresholdSegmentSize(Map<String, String> 
streamConfigMap) {
     long segmentSizeBytes = -1;
     String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE;
@@ -378,7 +391,8 @@ public class StreamConfig {
         + ", _flushSegmentDesiredSizeBytes=" + _flushThresholdSegmentSizeBytes 
+ ", _flushAutotuneInitialRows="
         + _flushAutotuneInitialRows + ", _decoderClass='" + _decoderClass + 
'\'' + ", _decoderProperties="
         + _decoderProperties + ", _groupId='" + _groupId + "', 
_topicConsumptionRateLimit=" + _topicConsumptionRateLimit
-        + ", _tableNameWithType='" + _tableNameWithType + '}';
+        + ", _tableNameWithType='" + _tableNameWithType + ", 
_serverUploadToDeepStore=" + _serverUploadToDeepStore
+        + "}";
   }
 
   @Override
@@ -405,7 +419,8 @@ public class StreamConfig {
         that._decoderClass) && EqualityUtils.isEqual(_decoderProperties, 
that._decoderProperties)
         && EqualityUtils.isEqual(_groupId, that._groupId) && 
EqualityUtils.isEqual(_tableNameWithType,
         that._tableNameWithType) && 
EqualityUtils.isEqual(_topicConsumptionRateLimit, 
that._topicConsumptionRateLimit)
-        && EqualityUtils.isEqual(_streamConfigMap, that._streamConfigMap);
+        && EqualityUtils.isEqual(_streamConfigMap, that._streamConfigMap)
+        && _serverUploadToDeepStore == that._serverUploadToDeepStore;
   }
 
   @Override
@@ -428,6 +443,7 @@ public class StreamConfig {
     result = EqualityUtils.hashCodeOf(result, _topicConsumptionRateLimit);
     result = EqualityUtils.hashCodeOf(result, _streamConfigMap);
     result = EqualityUtils.hashCodeOf(result, _tableNameWithType);
+    result = EqualityUtils.hashCodeOf(result, _serverUploadToDeepStore);
     return result;
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
index 69c091a5a4..1cc9548a32 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
@@ -111,6 +111,11 @@ public class StreamConfigProperties {
   // Time threshold that controller will wait for the segment to be built by 
the server
   public static final String SEGMENT_COMMIT_TIMEOUT_SECONDS = 
"realtime.segment.commit.timeoutSeconds";
 
+  /**
+   * Config used to indicate whether server should by-pass controller and 
directly upload the segment to the deep store
+   */
+  public static final String SERVER_UPLOAD_TO_DEEPSTORE = 
"realtime.segment.serverUploadToDeepStore";
+
   /**
    * Helper method to create a stream specific property
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to