This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ba0b016  Refactor PinotSinkIntegrationTest to have consistent table 
config and schema (#8322)
ba0b016 is described below

commit ba0b0163a2687bbc69a5ef4081c6f72a1c966c38
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Mar 8 19:50:05 2022 -0800

    Refactor PinotSinkIntegrationTest to have consistent table config and 
schema (#8322)
---
 .../flink/sink/PinotSinkIntegrationTest.java       | 117 ++++++++++++-------
 .../pinot/connector/flink/util/PinotTestBase.java  | 129 ---------------------
 .../pinot/connector/flink/util/TestUtils.java      |  40 -------
 3 files changed, 78 insertions(+), 208 deletions(-)

diff --git 
a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTest.java
 
b/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTest.java
index 9552b5b..79c24ba 100644
--- 
a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTest.java
+++ 
b/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTest.java
@@ -19,107 +19,146 @@
 package org.apache.pinot.connector.flink.sink;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Lists;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.pinot.connector.flink.common.FlinkRowGenericRowConverter;
-import org.apache.pinot.connector.flink.util.PinotTestBase;
-import org.apache.pinot.connector.flink.util.TestUtils;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 
-public class PinotSinkIntegrationTest {
+public class PinotSinkIntegrationTest extends BaseClusterIntegrationTest {
+  public static final String RAW_TABLE_NAME = "testTable";
+  public static final String OFFLINE_TABLE_NAME = 
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
 
-  private static PinotTestBase _testBase = new PinotTestBase();
+  private List<Row> _data;
+  public RowTypeInfo _typeInfo;
+  public TableConfig _tableConfig;
+  public Schema _schema;
 
   @BeforeClass
-  public static void setUp()
+  public void setUp()
       throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_testBase.getTempDir(), 
_testBase.getSegmentDir(), _testBase.getTarDir());
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
 
-    // Start the Pinot cluster
-    _testBase.startCluster();
+    startZk();
+    startController();
+    startBroker();
+    startServer();
+
+    _data = Arrays.asList(Row.of(1, 1L, "Hi"), Row.of(2, 2L, "Hello"), 
Row.of(3, 3L, "Hello world"),
+        Row.of(4, 4L, "Hello world!"), Row.of(5, 5L, "HelloWorld"), Row.of(6, 
6L, "Hello!world!"));
+    _typeInfo =
+        new RowTypeInfo(new TypeInformation[]{Types.INT, Types.LONG, 
Types.STRING}, new String[]{"a", "b", "c"});
+    Map<String, String> batchConfigs = new HashMap<>();
+    batchConfigs.put(BatchConfigProperties.OUTPUT_DIR_URI, 
_tarDir.getAbsolutePath());
+    batchConfigs.put(BatchConfigProperties.OVERWRITE_OUTPUT, "false");
+    batchConfigs.put(BatchConfigProperties.PUSH_CONTROLLER_URI, 
_controllerBaseApiUrl);
+    IngestionConfig ingestionConfig =
+        new IngestionConfig(new 
BatchIngestionConfig(Collections.singletonList(batchConfigs), "APPEND", 
"HOURLY"), null,
+            null, null, null);
+    _tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setIngestionConfig(ingestionConfig)
+            .build();
+    _schema =
+        new 
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).addSingleValueDimension("a",
 FieldSpec.DataType.INT)
+            .addSingleValueDimension("b", FieldSpec.DataType.LONG)
+            .addSingleValueDimension("c", 
FieldSpec.DataType.STRING).setPrimaryKeyColumns(Lists.newArrayList("a"))
+            .build();
   }
 
   @AfterClass
-  public static void tearDown()
+  public void tearDown()
       throws Exception {
-    _testBase.stopCluster();
+    stopServer();
+    stopBroker();
+    stopController();
+    stopZk();
 
-    FileUtils.deleteDirectory(_testBase.getTempDir());
+    FileUtils.deleteDirectory(_tempDir);
   }
 
   @Test
   public void testPinotSinkWrite()
       throws Exception {
+    addSchema(_schema);
+    addTableConfig(_tableConfig);
+
     StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
     execEnv.setParallelism(1);
-    DataStream<Row> srcDs = 
execEnv.fromCollection(_testBase.TEST_DATA).returns(_testBase.TEST_TYPE_INFO);
-
-    TableConfig tableConfig = _testBase.createOfflineTableConfig();
-    _testBase.addTableConfig(tableConfig);
-    srcDs.addSink(new PinotSinkFunction<>(new 
FlinkRowGenericRowConverter(_testBase.TEST_TYPE_INFO), tableConfig,
-        _testBase.SCHEMA));
+    DataStream<Row> srcDs = execEnv.fromCollection(_data).returns(_typeInfo);
+    srcDs.addSink(new PinotSinkFunction<>(new 
FlinkRowGenericRowConverter(_typeInfo), _tableConfig, _schema));
     execEnv.execute();
     Assert.assertEquals(getNumSegments(), 1);
     Assert.assertEquals(getTotalNumDocs(), 6);
-    dropTable();
+
+    deleteSchema(RAW_TABLE_NAME);
+    dropOfflineTable(OFFLINE_TABLE_NAME);
   }
 
   @Test
   public void testPinotSinkParallelWrite()
       throws Exception {
+    addSchema(_schema);
+    addTableConfig(_tableConfig);
+
     StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
     execEnv.setParallelism(2);
-    DataStream<Row> srcDs =
-        
execEnv.fromCollection(_testBase.TEST_DATA).returns(_testBase.TEST_TYPE_INFO).keyBy(r
 -> r.getField(0));
-
-    TableConfig tableConfig = _testBase.createOfflineTableConfig();
-    _testBase.addTableConfig(tableConfig);
-    srcDs.addSink(new PinotSinkFunction<>(new 
FlinkRowGenericRowConverter(_testBase.TEST_TYPE_INFO), tableConfig,
-        _testBase.SCHEMA));
+    DataStream<Row> srcDs = 
execEnv.fromCollection(_data).returns(_typeInfo).keyBy(r -> r.getField(0));
+    srcDs.addSink(new PinotSinkFunction<>(new 
FlinkRowGenericRowConverter(_typeInfo), _tableConfig, _schema));
     execEnv.execute();
     Assert.assertEquals(getNumSegments(), 2);
     Assert.assertEquals(getTotalNumDocs(), 6);
-    dropTable();
-  }
 
-  private void dropTable()
-      throws IOException {
-    String tableNameWithType = 
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(_testBase.getTableName());
-    _testBase.dropOfflineTable(tableNameWithType);
+    deleteSchema(RAW_TABLE_NAME);
+    dropOfflineTable(OFFLINE_TABLE_NAME);
   }
 
   private int getNumSegments()
       throws IOException {
-    String tableNameWithType = 
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(_testBase.getTableName());
-    String jsonOutputStr = 
_testBase.sendGetRequest(_testBase.getControllerRequestURLBuilder()
-        .forSegmentListAPIWithTableType(tableNameWithType, 
TableType.OFFLINE.toString()));
+    String jsonOutputStr = ControllerTest.sendGetRequest(
+        
_controllerRequestURLBuilder.forSegmentListAPIWithTableType(OFFLINE_TABLE_NAME, 
TableType.OFFLINE.toString()));
     JsonNode array = JsonUtils.stringToJsonNode(jsonOutputStr);
     return array.get(0).get("OFFLINE").size();
   }
 
   private int getTotalNumDocs()
       throws IOException {
-    String tableNameWithType = 
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(_testBase.getTableName());
-    String jsonOutputStr = 
_testBase.sendGetRequest(_testBase.getControllerRequestURLBuilder()
-        .forSegmentListAPIWithTableType(tableNameWithType, 
TableType.OFFLINE.toString()));
+    String jsonOutputStr = ControllerTest.sendGetRequest(
+        
_controllerRequestURLBuilder.forSegmentListAPIWithTableType(OFFLINE_TABLE_NAME, 
TableType.OFFLINE.toString()));
     JsonNode array = JsonUtils.stringToJsonNode(jsonOutputStr);
     JsonNode segments = array.get(0).get("OFFLINE");
     int totalDocCount = 0;
     for (int i = 0; i < segments.size(); i++) {
       String segmentName = segments.get(i).asText();
-      jsonOutputStr = _testBase.sendGetRequest(
-          
_testBase.getControllerRequestURLBuilder().forSegmentMetadata(tableNameWithType,
 segmentName));
+      jsonOutputStr = ControllerTest.sendGetRequest(
+          _controllerRequestURLBuilder.forSegmentMetadata(OFFLINE_TABLE_NAME, 
segmentName));
       JsonNode metadata = JsonUtils.stringToJsonNode(jsonOutputStr);
       totalDocCount += metadata.get("segment.total.docs").asInt();
     }
diff --git 
a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/util/PinotTestBase.java
 
b/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/util/PinotTestBase.java
deleted file mode 100644
index cbedece..0000000
--- 
a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/util/PinotTestBase.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.connector.flink.util;
-
-import com.google.common.collect.Lists;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.types.Row;
-import org.apache.pinot.controller.helix.ControllerRequestURLBuilder;
-import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
-import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
-
-
-public class PinotTestBase extends BaseClusterIntegrationTest {
-
-  public static final List<Row> TEST_DATA = new ArrayList<>();
-  public static final RowTypeInfo TEST_TYPE_INFO =
-      new RowTypeInfo(new TypeInformation[]{Types.INT, Types.LONG, 
Types.STRING}, new String[]{"a", "b", "c"});
-  public static final Schema SCHEMA = new 
Schema.SchemaBuilder().addSingleValueDimension("a", FieldSpec.DataType.INT)
-      .addSingleValueDimension("b", 
FieldSpec.DataType.LONG).addSingleValueDimension("c", FieldSpec.DataType.STRING)
-      .setPrimaryKeyColumns(Lists.newArrayList("a")).build();
-
-  static {
-    TEST_DATA.add(Row.of(1, 1L, "Hi"));
-    TEST_DATA.add(Row.of(2, 2L, "Hello"));
-    TEST_DATA.add(Row.of(3, 3L, "Hello world"));
-    TEST_DATA.add(Row.of(4, 4L, "Hello world!"));
-    TEST_DATA.add(Row.of(5, 4L, "HelloWorld"));
-    TEST_DATA.add(Row.of(6, 5L, "Hello!world!"));
-  }
-
-  public void startCluster()
-      throws Exception {
-    startZk();
-    startController();
-    startBroker();
-    startServer();
-  }
-
-  @Override
-  @Nullable
-  protected IngestionConfig getIngestionConfig() {
-    Map<String, String> batchConfigMap = new HashMap<>();
-    batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, 
getTarDir().getAbsolutePath());
-    batchConfigMap.put(BatchConfigProperties.OVERWRITE_OUTPUT, "false");
-    batchConfigMap.put(BatchConfigProperties.PUSH_CONTROLLER_URI, 
getControllerBaseApiUrl());
-    return new IngestionConfig(new 
BatchIngestionConfig(Lists.newArrayList(batchConfigMap), "APPEND", "HOURLY"), 
null,
-        null, null, null);
-  }
-
-  public void stopCluster()
-      throws Exception {
-    stopServer();
-    stopBroker();
-    stopController();
-    stopZk();
-  }
-
-  public File getTempDir() {
-    return _tempDir;
-  }
-
-  public File getSegmentDir() {
-    return _segmentDir;
-  }
-
-  public File getTarDir() {
-    return _tarDir;
-  }
-
-  public String getControllerBaseApiUrl() {
-    return _controllerBaseApiUrl;
-  }
-
-  @Override
-  public String getTableName() {
-    return super.getTableName();
-  }
-
-  @Override
-  public TableConfig createOfflineTableConfig() {
-    return super.createOfflineTableConfig();
-  }
-
-  @Override
-  public void addTableConfig(TableConfig tableConfig)
-      throws IOException {
-    super.addTableConfig(tableConfig);
-  }
-
-  public ControllerRequestURLBuilder getControllerRequestURLBuilder() {
-    return _controllerRequestURLBuilder;
-  }
-
-  @Override
-  public void dropOfflineTable(String tableNameWithType)
-      throws IOException {
-    super.dropOfflineTable(tableNameWithType);
-  }
-}
diff --git 
a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/util/TestUtils.java
 
b/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/util/TestUtils.java
deleted file mode 100644
index db749ed..0000000
--- 
a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/util/TestUtils.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.connector.flink.util;
-
-import java.io.File;
-import java.io.IOException;
-import javax.annotation.Nonnull;
-import org.apache.commons.io.FileUtils;
-import org.testng.Assert;
-
-
-public final class TestUtils {
-
-  private TestUtils() {
-  }
-
-  public static void ensureDirectoriesExistAndEmpty(@Nonnull File... dirs)
-      throws IOException {
-    for (File dir : dirs) {
-      FileUtils.deleteDirectory(dir);
-      Assert.assertTrue(dir.mkdirs());
-    }
-  }
-}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to