This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e63ef628f3 Move the logic to modify default value for consuming
segment inside RealtimeTableDataManager (#10086)
e63ef628f3 is described below
commit e63ef628f32578af3197392b7e94afe7d91f5983
Author: Saurabh Dubey <[email protected]>
AuthorDate: Tue Jan 10 22:30:38 2023 +0530
Move the logic to modify default value for consuming segment inside
RealtimeTableDataManager (#10086)
Co-authored-by: Saurabh Dubey <[email protected]>
---
.../core/data/manager/BaseTableDataManager.java | 2 +-
.../manager/realtime/RealtimeTableDataManager.java | 37 +++++++++++++
.../realtime/RealtimeTableDataManagerTest.java | 28 ++++++++++
.../starter/helix/HelixInstanceDataManager.java | 40 --------------
.../helix/HelixInstanceDataManagerTest.java | 63 ----------------------
5 files changed, 66 insertions(+), 104 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index ac8a717670..ff5a7693f6 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -81,7 +81,7 @@ import org.slf4j.LoggerFactory;
@ThreadSafe
public abstract class BaseTableDataManager implements TableDataManager {
- private static final Logger LOGGER =
LoggerFactory.getLogger(BaseTableDataManager.class);
+ protected static final Logger LOGGER =
LoggerFactory.getLogger(BaseTableDataManager.class);
protected final ConcurrentHashMap<String, SegmentDataManager>
_segmentDataManagerMap = new ConcurrentHashMap<>();
// Semaphore to restrict the maximum number of parallel segment downloads
for a table.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index fb53de64ea..0527aac163 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.data.manager.realtime;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
@@ -37,6 +38,7 @@ import java.util.function.Supplier;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
@@ -72,10 +74,13 @@ import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
+import org.apache.pinot.spi.utils.TimeUtils;
import static
org.apache.pinot.spi.utils.CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD;
@@ -390,6 +395,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
throw new RuntimeException("Mismatching schema/table config for " +
_tableNameWithType);
}
VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(schema,
segmentName);
+ setDefaultTimeValueIfInvalid(tableConfig, schema, segmentZKMetadata);
if (!isHLCSegment) {
// Generates only one semaphore for every partitionGroupId
@@ -429,6 +435,37 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
_serverMetrics.addValueToTableGauge(_tableNameWithType,
ServerGauge.SEGMENT_COUNT, 1L);
}
+ /**
+ * Sets the default time value in the schema as the segment creation time if
it is invalid. Time column is used to
+ * manage the segments, so its values have to be within the valid range.
+ */
+ @VisibleForTesting
+ static void setDefaultTimeValueIfInvalid(TableConfig tableConfig, Schema
schema, SegmentZKMetadata zkMetadata) {
+ String timeColumnName =
tableConfig.getValidationConfig().getTimeColumnName();
+ if (StringUtils.isEmpty(timeColumnName)) {
+ return;
+ }
+ DateTimeFieldSpec timeColumnSpec =
schema.getSpecForTimeColumn(timeColumnName);
+ Preconditions.checkState(timeColumnSpec != null, "Failed to find time
field: %s from schema: %s", timeColumnName,
+ schema.getSchemaName());
+ String defaultTimeString = timeColumnSpec.getDefaultNullValueString();
+ DateTimeFormatSpec dateTimeFormatSpec = timeColumnSpec.getFormatSpec();
+ try {
+ long defaultTimeMs =
dateTimeFormatSpec.fromFormatToMillis(defaultTimeString);
+ if (TimeUtils.timeValueInValidRange(defaultTimeMs)) {
+ return;
+ }
+ } catch (Exception e) {
+ // Ignore
+ }
+ String creationTimeString =
dateTimeFormatSpec.fromMillisToFormat(zkMetadata.getCreationTime());
+ Object creationTime =
timeColumnSpec.getDataType().convert(creationTimeString);
+ timeColumnSpec.setDefaultNullValue(creationTime);
+ LOGGER.info(
+ "Default time: {} does not comply with format: {}, using creation
time: {} as the default time for table: {}",
+ defaultTimeString, timeColumnSpec.getFormat(), creationTime,
tableConfig.getTableName());
+ }
+
@Override
public void addSegment(ImmutableSegment immutableSegment) {
if (isUpsertEnabled()) {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
index dbb4d0a86d..392e562fac 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
@@ -45,6 +45,7 @@ import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
@@ -52,6 +53,8 @@ import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.util.TestUtils;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -61,6 +64,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@@ -221,4 +225,28 @@ public class RealtimeTableDataManagerTest {
AccessOption.PERSISTENT)).thenReturn(schemaZNRecord);
return schema;
}
+
+ @Test
+ public void testSetDefaultTimeValueIfInvalid() {
+ SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class);
+ long currentTimeMs = System.currentTimeMillis();
+ when(segmentZKMetadata.getCreationTime()).thenReturn(currentTimeMs);
+
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("timeColumn").build();
+ Schema schema = new Schema.SchemaBuilder().setSchemaName("testTable")
+ .addDateTime("timeColumn", FieldSpec.DataType.TIMESTAMP, "TIMESTAMP",
"1:MILLISECONDS").build();
+ RealtimeTableDataManager.setDefaultTimeValueIfInvalid(tableConfig, schema,
segmentZKMetadata);
+ DateTimeFieldSpec timeFieldSpec =
schema.getSpecForTimeColumn("timeColumn");
+ assertNotNull(timeFieldSpec);
+ assertEquals(timeFieldSpec.getDefaultNullValue(), currentTimeMs);
+
+ schema = new Schema.SchemaBuilder().setSchemaName("testTable")
+ .addDateTime("timeColumn", FieldSpec.DataType.INT,
"SIMPLE_DATE_FORMAT|yyyyMMdd", "1:DAYS").build();
+ RealtimeTableDataManager.setDefaultTimeValueIfInvalid(tableConfig, schema,
segmentZKMetadata);
+ timeFieldSpec = schema.getSpecForTimeColumn("timeColumn");
+ assertNotNull(timeFieldSpec);
+ assertEquals(timeFieldSpec.getDefaultNullValue(),
+
Integer.parseInt(DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC).print(currentTimeMs)));
+ }
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 25abdfc9d7..a52d627aac 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.server.starter.helix;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
@@ -40,7 +39,6 @@ import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.HelixManager;
import org.apache.helix.model.ExternalView;
@@ -67,11 +65,8 @@ import
org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.DateTimeFieldSpec;
-import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -190,43 +185,11 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
ZKMetadataProvider.getSegmentZKMetadata(_propertyStore,
realtimeTableName, segmentName);
Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata
for segment: %s, table: %s", segmentName,
realtimeTableName);
- setDefaultTimeValueIfInvalid(tableConfig, schema, zkMetadata);
_tableDataManagerMap.computeIfAbsent(realtimeTableName, k ->
createTableDataManager(k, tableConfig))
.addSegment(segmentName, new
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema),
zkMetadata);
LOGGER.info("Added segment: {} to table: {}", segmentName,
realtimeTableName);
}
- /**
- * Sets the default time value in the schema as the segment creation time if
it is invalid. Time column is used to
- * manage the segments, so its values have to be within the valid range.
- */
- @VisibleForTesting
- static void setDefaultTimeValueIfInvalid(TableConfig tableConfig, Schema
schema, SegmentZKMetadata zkMetadata) {
- String timeColumnName =
tableConfig.getValidationConfig().getTimeColumnName();
- if (StringUtils.isEmpty(timeColumnName)) {
- return;
- }
- DateTimeFieldSpec timeColumnSpec =
schema.getSpecForTimeColumn(timeColumnName);
- Preconditions.checkState(timeColumnSpec != null, "Failed to find time
field: %s from schema: %s", timeColumnName,
- schema.getSchemaName());
- String defaultTimeString = timeColumnSpec.getDefaultNullValueString();
- DateTimeFormatSpec dateTimeFormatSpec = timeColumnSpec.getFormatSpec();
- try {
- long defaultTimeMs =
dateTimeFormatSpec.fromFormatToMillis(defaultTimeString);
- if (TimeUtils.timeValueInValidRange(defaultTimeMs)) {
- return;
- }
- } catch (Exception e) {
- // Ignore
- }
- String creationTimeString =
dateTimeFormatSpec.fromMillisToFormat(zkMetadata.getCreationTime());
- Object creationTime =
timeColumnSpec.getDataType().convert(creationTimeString);
- timeColumnSpec.setDefaultNullValue(creationTime);
- LOGGER.info(
- "Default time: {} does not comply with format: {}, using creation
time: {} as the default time for table: {}",
- defaultTimeString, timeColumnSpec.getFormat(), creationTime,
tableConfig.getTableName());
- }
-
private TableDataManager createTableDataManager(String tableNameWithType,
TableConfig tableConfig) {
LOGGER.info("Creating table data manager for table: {}",
tableNameWithType);
TableDataManagerConfig tableDataManagerConfig = new
TableDataManagerConfig(_instanceDataManagerConfig, tableConfig);
@@ -476,9 +439,6 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
ZKMetadataProvider.getSegmentZKMetadata(_propertyStore,
tableNameWithType, segmentName);
Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata
for segment: %s, table: %s", segmentName,
tableNameWithType);
- if (schema != null) {
- setDefaultTimeValueIfInvalid(tableConfig, schema, zkMetadata);
- }
// This method might modify the file on disk. Use segment lock to prevent
race condition
Lock segmentLock = SegmentLocks.getSegmentLock(tableNameWithType,
segmentName);
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerTest.java
deleted file mode 100644
index f6351c505e..0000000000
---
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.server.starter.helix;
-
-import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.DateTimeFieldSpec;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.testng.annotations.Test;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
-
-public class HelixInstanceDataManagerTest {
-
- @Test
- public void testSetDefaultTimeValueIfInvalid() {
- SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class);
- long currentTimeMs = System.currentTimeMillis();
- when(segmentZKMetadata.getCreationTime()).thenReturn(currentTimeMs);
-
- TableConfig tableConfig =
- new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("timeColumn").build();
- Schema schema = new Schema.SchemaBuilder().setSchemaName("testTable")
- .addDateTime("timeColumn", DataType.TIMESTAMP, "TIMESTAMP",
"1:MILLISECONDS").build();
- HelixInstanceDataManager.setDefaultTimeValueIfInvalid(tableConfig, schema,
segmentZKMetadata);
- DateTimeFieldSpec timeFieldSpec =
schema.getSpecForTimeColumn("timeColumn");
- assertNotNull(timeFieldSpec);
- assertEquals(timeFieldSpec.getDefaultNullValue(), currentTimeMs);
-
- schema = new Schema.SchemaBuilder().setSchemaName("testTable")
- .addDateTime("timeColumn", DataType.INT,
"SIMPLE_DATE_FORMAT|yyyyMMdd", "1:DAYS").build();
- HelixInstanceDataManager.setDefaultTimeValueIfInvalid(tableConfig, schema,
segmentZKMetadata);
- timeFieldSpec = schema.getSpecForTimeColumn("timeColumn");
- assertNotNull(timeFieldSpec);
- assertEquals(timeFieldSpec.getDefaultNullValue(),
-
Integer.parseInt(DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC).print(currentTimeMs)));
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]