This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ba0b016 Refactor PinotSinkIntegrationTest to have consistent table
config and schema (#8322)
ba0b016 is described below
commit ba0b0163a2687bbc69a5ef4081c6f72a1c966c38
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Mar 8 19:50:05 2022 -0800
Refactor PinotSinkIntegrationTest to have consistent table config and
schema (#8322)
---
.../flink/sink/PinotSinkIntegrationTest.java | 117 ++++++++++++-------
.../pinot/connector/flink/util/PinotTestBase.java | 129 ---------------------
.../pinot/connector/flink/util/TestUtils.java | 40 -------
3 files changed, 78 insertions(+), 208 deletions(-)
diff --git
a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTest.java
b/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTest.java
index 9552b5b..79c24ba 100644
---
a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTest.java
+++
b/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTest.java
@@ -19,107 +19,146 @@
package org.apache.pinot.connector.flink.sink;
import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Lists;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.pinot.connector.flink.common.FlinkRowGenericRowConverter;
-import org.apache.pinot.connector.flink.util.PinotTestBase;
-import org.apache.pinot.connector.flink.util.TestUtils;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-public class PinotSinkIntegrationTest {
+public class PinotSinkIntegrationTest extends BaseClusterIntegrationTest {
+ public static final String RAW_TABLE_NAME = "testTable";
+ public static final String OFFLINE_TABLE_NAME =
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
- private static PinotTestBase _testBase = new PinotTestBase();
+ private List<Row> _data;
+ public RowTypeInfo _typeInfo;
+ public TableConfig _tableConfig;
+ public Schema _schema;
@BeforeClass
- public static void setUp()
+ public void setUp()
throws Exception {
- TestUtils.ensureDirectoriesExistAndEmpty(_testBase.getTempDir(),
_testBase.getSegmentDir(), _testBase.getTarDir());
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
- // Start the Pinot cluster
- _testBase.startCluster();
+ startZk();
+ startController();
+ startBroker();
+ startServer();
+
+ _data = Arrays.asList(Row.of(1, 1L, "Hi"), Row.of(2, 2L, "Hello"),
Row.of(3, 3L, "Hello world"),
+ Row.of(4, 4L, "Hello world!"), Row.of(5, 5L, "HelloWorld"), Row.of(6,
6L, "Hello!world!"));
+ _typeInfo =
+ new RowTypeInfo(new TypeInformation[]{Types.INT, Types.LONG,
Types.STRING}, new String[]{"a", "b", "c"});
+ Map<String, String> batchConfigs = new HashMap<>();
+ batchConfigs.put(BatchConfigProperties.OUTPUT_DIR_URI,
_tarDir.getAbsolutePath());
+ batchConfigs.put(BatchConfigProperties.OVERWRITE_OUTPUT, "false");
+ batchConfigs.put(BatchConfigProperties.PUSH_CONTROLLER_URI,
_controllerBaseApiUrl);
+ IngestionConfig ingestionConfig =
+ new IngestionConfig(new
BatchIngestionConfig(Collections.singletonList(batchConfigs), "APPEND",
"HOURLY"), null,
+ null, null, null);
+ _tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setIngestionConfig(ingestionConfig)
+ .build();
+ _schema =
+ new
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).addSingleValueDimension("a",
FieldSpec.DataType.INT)
+ .addSingleValueDimension("b", FieldSpec.DataType.LONG)
+ .addSingleValueDimension("c",
FieldSpec.DataType.STRING).setPrimaryKeyColumns(Lists.newArrayList("a"))
+ .build();
}
@AfterClass
- public static void tearDown()
+ public void tearDown()
throws Exception {
- _testBase.stopCluster();
+ stopServer();
+ stopBroker();
+ stopController();
+ stopZk();
- FileUtils.deleteDirectory(_testBase.getTempDir());
+ FileUtils.deleteDirectory(_tempDir);
}
@Test
public void testPinotSinkWrite()
throws Exception {
+ addSchema(_schema);
+ addTableConfig(_tableConfig);
+
StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
execEnv.setParallelism(1);
- DataStream<Row> srcDs =
execEnv.fromCollection(_testBase.TEST_DATA).returns(_testBase.TEST_TYPE_INFO);
-
- TableConfig tableConfig = _testBase.createOfflineTableConfig();
- _testBase.addTableConfig(tableConfig);
- srcDs.addSink(new PinotSinkFunction<>(new
FlinkRowGenericRowConverter(_testBase.TEST_TYPE_INFO), tableConfig,
- _testBase.SCHEMA));
+ DataStream<Row> srcDs = execEnv.fromCollection(_data).returns(_typeInfo);
+ srcDs.addSink(new PinotSinkFunction<>(new
FlinkRowGenericRowConverter(_typeInfo), _tableConfig, _schema));
execEnv.execute();
Assert.assertEquals(getNumSegments(), 1);
Assert.assertEquals(getTotalNumDocs(), 6);
- dropTable();
+
+ deleteSchema(RAW_TABLE_NAME);
+ dropOfflineTable(OFFLINE_TABLE_NAME);
}
@Test
public void testPinotSinkParallelWrite()
throws Exception {
+ addSchema(_schema);
+ addTableConfig(_tableConfig);
+
StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
execEnv.setParallelism(2);
- DataStream<Row> srcDs =
-
execEnv.fromCollection(_testBase.TEST_DATA).returns(_testBase.TEST_TYPE_INFO).keyBy(r
-> r.getField(0));
-
- TableConfig tableConfig = _testBase.createOfflineTableConfig();
- _testBase.addTableConfig(tableConfig);
- srcDs.addSink(new PinotSinkFunction<>(new
FlinkRowGenericRowConverter(_testBase.TEST_TYPE_INFO), tableConfig,
- _testBase.SCHEMA));
+ DataStream<Row> srcDs =
execEnv.fromCollection(_data).returns(_typeInfo).keyBy(r -> r.getField(0));
+ srcDs.addSink(new PinotSinkFunction<>(new
FlinkRowGenericRowConverter(_typeInfo), _tableConfig, _schema));
execEnv.execute();
Assert.assertEquals(getNumSegments(), 2);
Assert.assertEquals(getTotalNumDocs(), 6);
- dropTable();
- }
- private void dropTable()
- throws IOException {
- String tableNameWithType =
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(_testBase.getTableName());
- _testBase.dropOfflineTable(tableNameWithType);
+ deleteSchema(RAW_TABLE_NAME);
+ dropOfflineTable(OFFLINE_TABLE_NAME);
}
private int getNumSegments()
throws IOException {
- String tableNameWithType =
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(_testBase.getTableName());
- String jsonOutputStr =
_testBase.sendGetRequest(_testBase.getControllerRequestURLBuilder()
- .forSegmentListAPIWithTableType(tableNameWithType,
TableType.OFFLINE.toString()));
+ String jsonOutputStr = ControllerTest.sendGetRequest(
+
_controllerRequestURLBuilder.forSegmentListAPIWithTableType(OFFLINE_TABLE_NAME,
TableType.OFFLINE.toString()));
JsonNode array = JsonUtils.stringToJsonNode(jsonOutputStr);
return array.get(0).get("OFFLINE").size();
}
private int getTotalNumDocs()
throws IOException {
- String tableNameWithType =
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(_testBase.getTableName());
- String jsonOutputStr =
_testBase.sendGetRequest(_testBase.getControllerRequestURLBuilder()
- .forSegmentListAPIWithTableType(tableNameWithType,
TableType.OFFLINE.toString()));
+ String jsonOutputStr = ControllerTest.sendGetRequest(
+
_controllerRequestURLBuilder.forSegmentListAPIWithTableType(OFFLINE_TABLE_NAME,
TableType.OFFLINE.toString()));
JsonNode array = JsonUtils.stringToJsonNode(jsonOutputStr);
JsonNode segments = array.get(0).get("OFFLINE");
int totalDocCount = 0;
for (int i = 0; i < segments.size(); i++) {
String segmentName = segments.get(i).asText();
- jsonOutputStr = _testBase.sendGetRequest(
-
_testBase.getControllerRequestURLBuilder().forSegmentMetadata(tableNameWithType,
segmentName));
+ jsonOutputStr = ControllerTest.sendGetRequest(
+ _controllerRequestURLBuilder.forSegmentMetadata(OFFLINE_TABLE_NAME,
segmentName));
JsonNode metadata = JsonUtils.stringToJsonNode(jsonOutputStr);
totalDocCount += metadata.get("segment.total.docs").asInt();
}
diff --git
a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/util/PinotTestBase.java
b/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/util/PinotTestBase.java
deleted file mode 100644
index cbedece..0000000
---
a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/util/PinotTestBase.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.connector.flink.util;
-
-import com.google.common.collect.Lists;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.types.Row;
-import org.apache.pinot.controller.helix.ControllerRequestURLBuilder;
-import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
-import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
-
-
-public class PinotTestBase extends BaseClusterIntegrationTest {
-
- public static final List<Row> TEST_DATA = new ArrayList<>();
- public static final RowTypeInfo TEST_TYPE_INFO =
- new RowTypeInfo(new TypeInformation[]{Types.INT, Types.LONG,
Types.STRING}, new String[]{"a", "b", "c"});
- public static final Schema SCHEMA = new
Schema.SchemaBuilder().addSingleValueDimension("a", FieldSpec.DataType.INT)
- .addSingleValueDimension("b",
FieldSpec.DataType.LONG).addSingleValueDimension("c", FieldSpec.DataType.STRING)
- .setPrimaryKeyColumns(Lists.newArrayList("a")).build();
-
- static {
- TEST_DATA.add(Row.of(1, 1L, "Hi"));
- TEST_DATA.add(Row.of(2, 2L, "Hello"));
- TEST_DATA.add(Row.of(3, 3L, "Hello world"));
- TEST_DATA.add(Row.of(4, 4L, "Hello world!"));
- TEST_DATA.add(Row.of(5, 4L, "HelloWorld"));
- TEST_DATA.add(Row.of(6, 5L, "Hello!world!"));
- }
-
- public void startCluster()
- throws Exception {
- startZk();
- startController();
- startBroker();
- startServer();
- }
-
- @Override
- @Nullable
- protected IngestionConfig getIngestionConfig() {
- Map<String, String> batchConfigMap = new HashMap<>();
- batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI,
getTarDir().getAbsolutePath());
- batchConfigMap.put(BatchConfigProperties.OVERWRITE_OUTPUT, "false");
- batchConfigMap.put(BatchConfigProperties.PUSH_CONTROLLER_URI,
getControllerBaseApiUrl());
- return new IngestionConfig(new
BatchIngestionConfig(Lists.newArrayList(batchConfigMap), "APPEND", "HOURLY"),
null,
- null, null, null);
- }
-
- public void stopCluster()
- throws Exception {
- stopServer();
- stopBroker();
- stopController();
- stopZk();
- }
-
- public File getTempDir() {
- return _tempDir;
- }
-
- public File getSegmentDir() {
- return _segmentDir;
- }
-
- public File getTarDir() {
- return _tarDir;
- }
-
- public String getControllerBaseApiUrl() {
- return _controllerBaseApiUrl;
- }
-
- @Override
- public String getTableName() {
- return super.getTableName();
- }
-
- @Override
- public TableConfig createOfflineTableConfig() {
- return super.createOfflineTableConfig();
- }
-
- @Override
- public void addTableConfig(TableConfig tableConfig)
- throws IOException {
- super.addTableConfig(tableConfig);
- }
-
- public ControllerRequestURLBuilder getControllerRequestURLBuilder() {
- return _controllerRequestURLBuilder;
- }
-
- @Override
- public void dropOfflineTable(String tableNameWithType)
- throws IOException {
- super.dropOfflineTable(tableNameWithType);
- }
-}
diff --git
a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/util/TestUtils.java
b/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/util/TestUtils.java
deleted file mode 100644
index db749ed..0000000
---
a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/util/TestUtils.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.connector.flink.util;
-
-import java.io.File;
-import java.io.IOException;
-import javax.annotation.Nonnull;
-import org.apache.commons.io.FileUtils;
-import org.testng.Assert;
-
-
-public final class TestUtils {
-
- private TestUtils() {
- }
-
- public static void ensureDirectoriesExistAndEmpty(@Nonnull File... dirs)
- throws IOException {
- for (File dir : dirs) {
- FileUtils.deleteDirectory(dir);
- Assert.assertTrue(dir.mkdirs());
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]