This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d2d74d1a01 Decouple dependency on using peer segment download scheme
for doing metadata pushes to controller (#10136)
d2d74d1a01 is described below
commit d2d74d1a0187ff11b73bb2c3252550a68010960b
Author: Navina Ramesh <[email protected]>
AuthorDate: Fri Jan 20 17:28:20 2023 -0800
Decouple dependency on using peer segment download scheme for doing
metadata pushes to controller (#10136)
---
.../realtime/PeerSchemeSplitSegmentCommitter.java | 48 --------
.../manager/realtime/SegmentCommitterFactory.java | 34 ++++--
.../manager/realtime/SplitSegmentCommitter.java | 23 +++-
.../realtime/SegmentCommitterFactoryTest.java | 124 +++++++++++++++++++++
.../org/apache/pinot/spi/stream/StreamConfig.java | 20 +++-
.../pinot/spi/stream/StreamConfigProperties.java | 5 +
6 files changed, 194 insertions(+), 60 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PeerSchemeSplitSegmentCommitter.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PeerSchemeSplitSegmentCommitter.java
deleted file mode 100644
index 70d9de86d7..0000000000
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PeerSchemeSplitSegmentCommitter.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.data.manager.realtime;
-
-import java.io.File;
-import java.net.URI;
-import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
-import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.StringUtil;
-import org.slf4j.Logger;
-
-
-public class PeerSchemeSplitSegmentCommitter extends SplitSegmentCommitter {
- public PeerSchemeSplitSegmentCommitter(Logger segmentLogger,
ServerSegmentCompletionProtocolHandler protocolHandler,
- SegmentCompletionProtocol.Request.Params params, SegmentUploader
segmentUploader) {
- super(segmentLogger, protocolHandler, params, segmentUploader);
- }
-
- // Always return a uri string even if the segment upload fails and returns a
null uri.
- // If the segment upload fails, put peer:///segment_name in the segment
location to notify the controller it is a
- // peer download scheme.
- protected String uploadSegment(File segmentTarFile, SegmentUploader
segmentUploader,
- SegmentCompletionProtocol.Request.Params params) {
- URI segmentLocation = segmentUploader.uploadSegment(segmentTarFile, new
LLCSegmentName(params.getSegmentName()));
- if (segmentLocation == null) {
- return StringUtil.join("/",
CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME, params.getSegmentName());
- }
- return segmentLocation.toString();
- }
-}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
index 30e4d1d92f..5a52103efc 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
@@ -24,6 +24,8 @@ import
org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.slf4j.Logger;
@@ -34,6 +36,7 @@ public class SegmentCommitterFactory {
private static Logger _logger;
private final ServerSegmentCompletionProtocolHandler _protocolHandler;
private final TableConfig _tableConfig;
+ private final StreamConfig _streamConfig;
private final ServerMetrics _serverMetrics;
private final IndexLoadingConfig _indexLoadingConfig;
@@ -42,10 +45,20 @@ public class SegmentCommitterFactory {
_logger = segmentLogger;
_protocolHandler = protocolHandler;
_tableConfig = tableConfig;
+ _streamConfig = new StreamConfig(_tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(_tableConfig));
_indexLoadingConfig = indexLoadingConfig;
_serverMetrics = serverMetrics;
}
+ /**
+ *
+ * @param isSplitCommit Indicates if the controller has enabled split commit
+ * @param params Parameters to use in the Segment completion request
+ * @param controllerVipUrl Unused,
+ * @return
+ * @throws URISyntaxException
+ */
public SegmentCommitter createSegmentCommitter(boolean isSplitCommit,
SegmentCompletionProtocol.Request.Params params,
String controllerVipUrl)
throws URISyntaxException {
@@ -53,18 +66,21 @@ public class SegmentCommitterFactory {
return new DefaultSegmentCommitter(_logger, _protocolHandler, params);
}
SegmentUploader segmentUploader;
- // TODO Instead of using a peer segment download scheme to control how the
servers do split commit, we should use
- // other configs such as server or controller configs or controller
responses to the servers.
- if (_tableConfig.getValidationConfig().getPeerSegmentDownloadScheme() !=
null) {
+
+ boolean uploadToFs = _streamConfig.isServerUploadToDeepStore();
+ String peerSegmentDownloadScheme =
_tableConfig.getValidationConfig().getPeerSegmentDownloadScheme();
+ // TODO: exists for backwards compatibility. remove peerDownloadScheme
non-null check once users have migrated
+ if (uploadToFs || peerSegmentDownloadScheme != null) {
segmentUploader = new
PinotFSSegmentUploader(_indexLoadingConfig.getSegmentStoreURI(),
PinotFSSegmentUploader.DEFAULT_SEGMENT_UPLOAD_TIMEOUT_MILLIS);
- return new PeerSchemeSplitSegmentCommitter(_logger, _protocolHandler,
params, segmentUploader);
+ } else {
+ segmentUploader = new Server2ControllerSegmentUploader(_logger,
+ _protocolHandler.getFileUploadDownloadClient(),
+ _protocolHandler.getSegmentCommitUploadURL(params,
controllerVipUrl), params.getSegmentName(),
+
ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(),
_serverMetrics,
+ _protocolHandler.getAuthProvider());
}
- segmentUploader = new Server2ControllerSegmentUploader(_logger,
_protocolHandler.getFileUploadDownloadClient(),
- _protocolHandler.getSegmentCommitUploadURL(params, controllerVipUrl),
params.getSegmentName(),
-
ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(),
_serverMetrics,
- _protocolHandler.getAuthProvider());
- return new SplitSegmentCommitter(_logger, _protocolHandler, params,
segmentUploader);
+ return new SplitSegmentCommitter(_logger, _protocolHandler, params,
segmentUploader, peerSegmentDownloadScheme);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
index 33b2ac0093..7f3d3ea1fb 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
@@ -18,11 +18,15 @@
*/
package org.apache.pinot.core.data.manager.realtime;
+import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.net.URI;
+import javax.annotation.Nullable;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.StringUtil;
import org.slf4j.Logger;
@@ -34,14 +38,27 @@ public class SplitSegmentCommitter implements
SegmentCommitter {
private final SegmentCompletionProtocol.Request.Params _params;
private final ServerSegmentCompletionProtocolHandler _protocolHandler;
private final SegmentUploader _segmentUploader;
+ private final String _peerDownloadScheme;
private final Logger _segmentLogger;
public SplitSegmentCommitter(Logger segmentLogger,
ServerSegmentCompletionProtocolHandler protocolHandler,
- SegmentCompletionProtocol.Request.Params params, SegmentUploader
segmentUploader) {
+ SegmentCompletionProtocol.Request.Params params, SegmentUploader
segmentUploader,
+ @Nullable String peerDownloadScheme) {
_segmentLogger = segmentLogger;
_protocolHandler = protocolHandler;
_params = new SegmentCompletionProtocol.Request.Params(params);
_segmentUploader = segmentUploader;
+ _peerDownloadScheme = peerDownloadScheme;
+ }
+
+ @VisibleForTesting
+ SegmentUploader getSegmentUploader() {
+ return _segmentUploader;
+ }
+
+ public SplitSegmentCommitter(Logger segmentLogger,
ServerSegmentCompletionProtocolHandler protocolHandler,
+ SegmentCompletionProtocol.Request.Params params, SegmentUploader
segmentUploader) {
+ this(segmentLogger, protocolHandler, params, segmentUploader, null);
}
@Override
@@ -79,6 +96,10 @@ public class SplitSegmentCommitter implements
SegmentCommitter {
if (segmentLocation != null) {
return segmentLocation.toString();
}
+ if (_peerDownloadScheme != null) {
+ return StringUtil.join("/",
CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME,
+ params.getSegmentName());
+ }
return null;
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java
new file mode 100644
index 0000000000..c815d48a94
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.data.manager.realtime;
+
+import com.google.common.collect.ImmutableMap;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class SegmentCommitterFactoryTest {
+
+ private Map<String, String> getMinimumStreamConfigMap() {
+ return ImmutableMap.of(
+ "streamType", "kafka",
+ "stream.kafka.consumer.type", "simple",
+ "stream.kafka.topic.name", "ignore",
+ "stream.kafka.decoder.class.name",
"org.apache.pinot.plugin.inputformat.json.JsonMessageDecoder");
+ }
+
+ private TableConfigBuilder createRealtimeTableConfig(String tableName) {
+ return createRealtimeTableConfig(tableName, getMinimumStreamConfigMap());
+ }
+
+ private TableConfigBuilder createRealtimeTableConfig(String tableName,
Map<String, String> realtimeStreamConfig) {
+ return new TableConfigBuilder(TableType.REALTIME)
+ .setTableName(tableName)
+ .setLLC(true)
+ .setStreamConfigs(realtimeStreamConfig);
+ }
+
+ @Test (description = "if controller doesn't support split commit, it should
return default segment committer")
+ public void testControllerNoSplit()
+ throws URISyntaxException {
+ TableConfig config = createRealtimeTableConfig("test").build();
+
+ SegmentCommitterFactory factory = new
SegmentCommitterFactory(Mockito.mock(Logger.class),
+ Mockito.mock(ServerSegmentCompletionProtocolHandler.class), config,
+ Mockito.mock(IndexLoadingConfig.class),
Mockito.mock(ServerMetrics.class));
+ SegmentCommitter committer = factory.createSegmentCommitter(false, null,
null);
+ Assert.assertNotNull(committer);
+ Assert.assertTrue(committer instanceof DefaultSegmentCommitter);
+ }
+
+ @Test(description = "when controller supports split commit, server should
always use split segment commit")
+ public void testSplitSegmentCommitterIsDefault()
+ throws URISyntaxException {
+ TableConfig config = createRealtimeTableConfig("test").build();
+ ServerSegmentCompletionProtocolHandler protocolHandler =
+ new
ServerSegmentCompletionProtocolHandler(Mockito.mock(ServerMetrics.class),
"test_REALTIME");
+ String controllerVipUrl = "http://localhost:1234";
+ SegmentCompletionProtocol.Request.Params requestParams = new
SegmentCompletionProtocol.Request.Params();
+ SegmentCommitterFactory factory = new
SegmentCommitterFactory(Mockito.mock(Logger.class), protocolHandler, config,
+ Mockito.mock(IndexLoadingConfig.class),
Mockito.mock(ServerMetrics.class));
+ SegmentCommitter committer = factory.createSegmentCommitter(true,
requestParams, controllerVipUrl);
+ Assert.assertNotNull(committer);
+ Assert.assertTrue(committer instanceof SplitSegmentCommitter);
+ }
+
+ @Test(description = "use upload to deepstore when either
serverUploadToDeepStore is set or peer segment download "
+ + "scheme is non-null")
+ public void testUploadToDeepStoreConfig()
+ throws URISyntaxException {
+ ServerSegmentCompletionProtocolHandler protocolHandler =
+ new
ServerSegmentCompletionProtocolHandler(Mockito.mock(ServerMetrics.class),
"test_REALTIME");
+ String controllerVipUrl = "http://localhost:1234";
+ SegmentCompletionProtocol.Request.Params requestParams = new
SegmentCompletionProtocol.Request.Params();
+
+ // No peer segment download scheme, serverUploadToDeepStore = true
+ Map<String, String> streamConfigMap = new
HashMap<>(getMinimumStreamConfigMap());
+ streamConfigMap.put(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE,
"true");
+ TableConfig config = createRealtimeTableConfig("testDeepStoreConfig",
streamConfigMap).build();
+
+ SegmentCommitterFactory factory = new
SegmentCommitterFactory(Mockito.mock(Logger.class), protocolHandler, config,
+ Mockito.mock(IndexLoadingConfig.class),
Mockito.mock(ServerMetrics.class));
+ SegmentCommitter committer = factory.createSegmentCommitter(true,
requestParams, controllerVipUrl);
+ Assert.assertNotNull(committer);
+ Assert.assertTrue(committer instanceof SplitSegmentCommitter);
+ Assert.assertTrue(((SplitSegmentCommitter) committer).getSegmentUploader()
instanceof PinotFSSegmentUploader);
+
+ // Peer segment download scheme is set, serverUploadToDeepStore = false
(for backwards compatibility)
+ Map<String, String> streamConfigMap1 = new
HashMap<>(getMinimumStreamConfigMap());
+ streamConfigMap1.put(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE,
"false");
+ TableConfig config1 = createRealtimeTableConfig("testDeepStoreConfig",
streamConfigMap1)
+ .setPeerSegmentDownloadScheme("http")
+ .build();
+
+ factory = new SegmentCommitterFactory(Mockito.mock(Logger.class),
protocolHandler, config1,
+ Mockito.mock(IndexLoadingConfig.class),
Mockito.mock(ServerMetrics.class));
+ committer = factory.createSegmentCommitter(true, requestParams,
controllerVipUrl);
+ Assert.assertNotNull(committer);
+ Assert.assertTrue(committer instanceof SplitSegmentCommitter);
+ Assert.assertTrue(((SplitSegmentCommitter) committer).getSegmentUploader()
instanceof PinotFSSegmentUploader);
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
index a2249dec4d..66c40636ec 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
@@ -50,6 +50,7 @@ public class StreamConfig {
public static final long DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS =
TimeUnit.MILLISECONDS.convert(6, TimeUnit.HOURS);
public static final long DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES = 200 *
1024 * 1024; // 200M
public static final int DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS = 100_000;
+ public static final String DEFAULT_SERVER_UPLOAD_TO_DEEPSTORE = "false";
public static final String DEFAULT_CONSUMER_FACTORY_CLASS_NAME_STRING =
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory";
@@ -89,6 +90,11 @@ public class StreamConfig {
// Allow overriding it to use different offset criteria
private OffsetCriteria _offsetCriteria;
+ // Indicates if the segment should be uploaded to the deep store's file
system or to the controller during the
+ // segment commit protocol. By default, segment is uploaded to the
controller during commit.
+ // If this flag is set to true, the segment is uploaded to deep store.
+ private final boolean _serverUploadToDeepStore;
+
/**
* Initializes a StreamConfig using the map of stream configs from the table
config
*/
@@ -191,6 +197,9 @@ public class StreamConfig {
_flushThresholdRows = extractFlushThresholdRows(streamConfigMap);
_flushThresholdTimeMillis =
extractFlushThresholdTimeMillis(streamConfigMap);
_flushThresholdSegmentSizeBytes =
extractFlushThresholdSegmentSize(streamConfigMap);
+ _serverUploadToDeepStore = Boolean.parseBoolean(
+
streamConfigMap.getOrDefault(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE,
+ DEFAULT_SERVER_UPLOAD_TO_DEEPSTORE));
int autotuneInitialRows = 0;
String initialRowsValue =
streamConfigMap.get(StreamConfigProperties.SEGMENT_FLUSH_AUTOTUNE_INITIAL_ROWS);
@@ -214,6 +223,10 @@ public class StreamConfig {
_streamConfigMap.putAll(streamConfigMap);
}
+ public boolean isServerUploadToDeepStore() {
+ return _serverUploadToDeepStore;
+ }
+
private long extractFlushThresholdSegmentSize(Map<String, String>
streamConfigMap) {
long segmentSizeBytes = -1;
String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE;
@@ -378,7 +391,8 @@ public class StreamConfig {
+ ", _flushSegmentDesiredSizeBytes=" + _flushThresholdSegmentSizeBytes
+ ", _flushAutotuneInitialRows="
+ _flushAutotuneInitialRows + ", _decoderClass='" + _decoderClass +
'\'' + ", _decoderProperties="
+ _decoderProperties + ", _groupId='" + _groupId + "',
_topicConsumptionRateLimit=" + _topicConsumptionRateLimit
- + ", _tableNameWithType='" + _tableNameWithType + '}';
+ + ", _tableNameWithType='" + _tableNameWithType + ",
_serverUploadToDeepStore=" + _serverUploadToDeepStore
+ + "}";
}
@Override
@@ -405,7 +419,8 @@ public class StreamConfig {
that._decoderClass) && EqualityUtils.isEqual(_decoderProperties,
that._decoderProperties)
&& EqualityUtils.isEqual(_groupId, that._groupId) &&
EqualityUtils.isEqual(_tableNameWithType,
that._tableNameWithType) &&
EqualityUtils.isEqual(_topicConsumptionRateLimit,
that._topicConsumptionRateLimit)
- && EqualityUtils.isEqual(_streamConfigMap, that._streamConfigMap);
+ && EqualityUtils.isEqual(_streamConfigMap, that._streamConfigMap)
+ && _serverUploadToDeepStore == that._serverUploadToDeepStore;
}
@Override
@@ -428,6 +443,7 @@ public class StreamConfig {
result = EqualityUtils.hashCodeOf(result, _topicConsumptionRateLimit);
result = EqualityUtils.hashCodeOf(result, _streamConfigMap);
result = EqualityUtils.hashCodeOf(result, _tableNameWithType);
+ result = EqualityUtils.hashCodeOf(result, _serverUploadToDeepStore);
return result;
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
index 69c091a5a4..1cc9548a32 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
@@ -111,6 +111,11 @@ public class StreamConfigProperties {
// Time threshold that controller will wait for the segment to be built by
the server
public static final String SEGMENT_COMMIT_TIMEOUT_SECONDS =
"realtime.segment.commit.timeoutSeconds";
+ /**
+ * Config used to indicate whether server should by-pass controller and
directly upload the segment to the deep store
+ */
+ public static final String SERVER_UPLOAD_TO_DEEPSTORE =
"realtime.segment.serverUploadToDeepStore";
+
/**
* Helper method to create a stream specific property
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]