Hisoka-X commented on code in PR #4147:
URL:
https://github.com/apache/incubator-seatunnel/pull/4147#discussion_r1109219030
##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java:
##########
@@ -114,34 +113,32 @@ public void deserialize(SourceRecord record,
Collector<SeaTunnelRow> collector)
Schema valueSchema = record.valueSchema();
Struct sourceStruct =
messageStruct.getStruct(Envelope.FieldName.SOURCE);
- String database = sourceStruct.getString(DATABASE_NAME_KEY);
String tableName =
sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY);
- String tableId = database + ":" + tableName;
Review Comment:
Why change tableId to tableName not database+tableName, shouldn't zeta
support multi database in one pipeline?
##########
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleConfig.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.dag.actions;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.seatunnel.api.table.type.MultipleRowType;
+import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import com.hazelcast.collection.IQueue;
+import com.hazelcast.config.QueueConfig;
+import com.hazelcast.core.HazelcastInstance;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+@Getter
+@Setter
+@Accessors(chain = true)
+@ToString
+@NoArgsConstructor
+public class ShuffleConfig implements Config {
+ public static final int DEFAULT_QUEUE_SIZE = 2048;
+ public static final int DEFAULT_QUEUE_BACKUP_COUNT = 0;
+ public static final int DEFAULT_QUEUE_ASYNC_BACKUP_COUNT = 0;
+ public static final int DEFAULT_BATCH_SIZE = 1024;
+ public static final long DEFAULT_BATCH_FLUSH_INTERVAL =
TimeUnit.SECONDS.toMillis(3);
+
+ private int batchSize;
+ private long batchFlushInterval;
+ private ShuffleStrategy shuffleStrategy;
+
+ @Getter
+ @Setter
+ @AllArgsConstructor
+ @NoArgsConstructor
+ @ToString
+ public abstract static class ShuffleStrategy implements Serializable {
+ private long jobId;
+ private int inputPartitions;
+ private int targetPartitions;
+ private QueueConfig queueConfig;
+
+ public abstract Map<String, IQueue<Record<?>>>
createShuffles(HazelcastInstance hazelcast, int inputIndex);
+
+ public abstract String createShuffleKey(Record<?> record, int
inputIndex);
+
+ public abstract IQueue<Record<?>>[] getShuffles(HazelcastInstance
hazelcast, int targetIndex);
+
+ protected IQueue<Record<?>> getIQueue(HazelcastInstance hazelcast,
String queueName) {
+ QueueConfig targetQueueConfig =
hazelcast.getConfig().getQueueConfig(queueName);
+ if (queueConfig != null) {
+ targetQueueConfig.setMaxSize(queueConfig.getMaxSize())
+ .setBackupCount(queueConfig.getBackupCount())
+ .setAsyncBackupCount(queueConfig.getAsyncBackupCount())
+ .setEmptyQueueTtl(queueConfig.getEmptyQueueTtl());
+ }
+ return hazelcast.getQueue(queueName);
+ }
+ }
+
+ @NoArgsConstructor
+ @ToString
+ public static class PartitionShuffleStrategy extends ShuffleStrategy {
+ private final Map<Integer, String[]> inputQueueMapping = new
HashMap<>();
+
+ public PartitionShuffleStrategy(long jobId,
+ int inputPartitions,
+ int targetPartitions) {
+ this(jobId, inputPartitions, targetPartitions, null);
+ }
+
+ public PartitionShuffleStrategy(long jobId,
+ int inputPartitions,
+ int targetPartitions,
+ QueueConfig queueConfig) {
+ super(jobId, inputPartitions, targetPartitions, queueConfig);
+ }
+
+ @Override
+ public Map<String, IQueue<Record<?>>> createShuffles(HazelcastInstance
hazelcast, int inputIndex) {
+ checkArgument(inputIndex >= 0 && inputIndex <
getInputPartitions());
+ Map<String, IQueue<Record<?>>> shuffleMap = new HashMap<>();
+ for (int targetIndex = 0; targetIndex < getTargetPartitions();
targetIndex++) {
+ String queueName = generateQueueName(inputIndex, targetIndex);
+ IQueue<Record<?>> queue = getIQueue(hazelcast, queueName);
+ shuffleMap.put(queueName, queue);
+ }
+ return shuffleMap;
+ }
+
+ @Override
+ public String createShuffleKey(Record<?> record, int inputIndex) {
+ String[] inputQueueNames =
inputQueueMapping.computeIfAbsent(inputIndex, key -> {
+ String[] queueNames = new String[getTargetPartitions()];
+ for (int targetIndex = 0; targetIndex < getTargetPartitions();
targetIndex++) {
+ queueNames[targetIndex] = generateQueueName(key,
targetIndex);
+ }
+ return queueNames;
+ });
+ return
inputQueueNames[ThreadLocalRandom.current().nextInt(getTargetPartitions())];
+ }
+
+ @Override
+ public IQueue<Record<?>>[] getShuffles(HazelcastInstance hazelcast,
int targetIndex) {
+ checkArgument(targetIndex >= 0 && targetIndex <
getTargetPartitions());
+ IQueue<Record<?>>[] shuffles = new IQueue[getInputPartitions()];
+ for (int inputIndex = 0; inputIndex < getInputPartitions();
inputIndex++) {
+ String queueName = generateQueueName(inputIndex, targetIndex);
+ shuffles[inputIndex] = getIQueue(hazelcast, queueName);
+ }
+ return shuffles;
+ }
+
+ private String generateQueueName(int inputIndex, int targetIndex) {
+ return String.format("PartitionShuffle-Queue[%s-%s-%s]",
getJobId(), inputIndex, targetIndex);
+ }
+ }
+
+ @Getter
+ @Setter
+ @NoArgsConstructor
+ @ToString
+ public static class MultipleRowShuffleStrategy extends ShuffleStrategy {
+ private MultipleRowType inputRowType;
+ private String targetTableId;
+
+ public MultipleRowShuffleStrategy(long jobId,
+ int inputPartitions,
+ MultipleRowType inputRowType,
+ int targetPartitions,
+ String targetTableId) {
+ this(jobId, inputPartitions, inputRowType, targetPartitions,
targetTableId, null);
+ }
+
+ public MultipleRowShuffleStrategy(long jobId,
+ int inputPartitions,
+ MultipleRowType inputRowType,
+ int targetPartitions,
+ String targetTableId,
+ QueueConfig queueConfig) {
+ super(jobId, inputPartitions, targetPartitions, queueConfig);
+ this.inputRowType = inputRowType;
+ this.targetTableId = targetTableId;
+ }
+
+ @Override
+ public Map<String, IQueue<Record<?>>> createShuffles(HazelcastInstance
hazelcast, int inputIndex) {
+ Map<String, IQueue<Record<?>>> shuffleMap = new HashMap<>();
+ for (Map.Entry<String, SeaTunnelRowType> entry : inputRowType) {
+ String tableId = entry.getKey();
+ String queueName = generateQueueName(inputIndex, tableId);
+ IQueue<Record<?>> queue = getIQueue(hazelcast, queueName);
+ shuffleMap.put(tableId, queue);
+ }
+ return shuffleMap;
+ }
+
+ @Override
+ public String createShuffleKey(Record<?> record, int inputIndex) {
+ String tableId = ((SeaTunnelRow) record.getData()).getTableId();
+ return generateQueueName(inputIndex, tableId);
+ }
+
+ @Override
+ public IQueue<Record<?>>[] getShuffles(HazelcastInstance hazelcast,
int targetIndex) {
+ IQueue<Record<?>>[] queues = new IQueue[getInputPartitions()];
+ for (int inputIndex = 0; inputIndex < getInputPartitions();
inputIndex++) {
+ String queueName = generateQueueName(inputIndex,
targetTableId);
+ queues[inputIndex] = getIQueue(hazelcast, queueName);
+ }
+ return queues;
+ }
+
+ private String generateQueueName(int inputIndex, String tableId) {
+ return "MultipleRowShuffle-Queue[" + getJobId() + "-" + inputIndex
+ "-" + tableId + "]";
Review Comment:
Same with above
##########
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleConfig.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.dag.actions;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.seatunnel.api.table.type.MultipleRowType;
+import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import com.hazelcast.collection.IQueue;
+import com.hazelcast.config.QueueConfig;
+import com.hazelcast.core.HazelcastInstance;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+@Getter
+@Setter
+@Accessors(chain = true)
+@ToString
+@NoArgsConstructor
+public class ShuffleConfig implements Config {
+ public static final int DEFAULT_QUEUE_SIZE = 2048;
+ public static final int DEFAULT_QUEUE_BACKUP_COUNT = 0;
+ public static final int DEFAULT_QUEUE_ASYNC_BACKUP_COUNT = 0;
+ public static final int DEFAULT_BATCH_SIZE = 1024;
+ public static final long DEFAULT_BATCH_FLUSH_INTERVAL =
TimeUnit.SECONDS.toMillis(3);
+
+ private int batchSize;
+ private long batchFlushInterval;
+ private ShuffleStrategy shuffleStrategy;
+
+ @Getter
+ @Setter
+ @AllArgsConstructor
+ @NoArgsConstructor
+ @ToString
+ public abstract static class ShuffleStrategy implements Serializable {
+ private long jobId;
+ private int inputPartitions;
+ private int targetPartitions;
+ private QueueConfig queueConfig;
+
+ public abstract Map<String, IQueue<Record<?>>>
createShuffles(HazelcastInstance hazelcast, int inputIndex);
+
+ public abstract String createShuffleKey(Record<?> record, int
inputIndex);
+
+ public abstract IQueue<Record<?>>[] getShuffles(HazelcastInstance
hazelcast, int targetIndex);
+
+ protected IQueue<Record<?>> getIQueue(HazelcastInstance hazelcast,
String queueName) {
+ QueueConfig targetQueueConfig =
hazelcast.getConfig().getQueueConfig(queueName);
+ if (queueConfig != null) {
+ targetQueueConfig.setMaxSize(queueConfig.getMaxSize())
+ .setBackupCount(queueConfig.getBackupCount())
+ .setAsyncBackupCount(queueConfig.getAsyncBackupCount())
+ .setEmptyQueueTtl(queueConfig.getEmptyQueueTtl());
+ }
+ return hazelcast.getQueue(queueName);
+ }
+ }
+
+ @NoArgsConstructor
+ @ToString
+ public static class PartitionShuffleStrategy extends ShuffleStrategy {
+ private final Map<Integer, String[]> inputQueueMapping = new
HashMap<>();
+
+ public PartitionShuffleStrategy(long jobId,
+ int inputPartitions,
+ int targetPartitions) {
+ this(jobId, inputPartitions, targetPartitions, null);
+ }
+
+ public PartitionShuffleStrategy(long jobId,
+ int inputPartitions,
+ int targetPartitions,
+ QueueConfig queueConfig) {
+ super(jobId, inputPartitions, targetPartitions, queueConfig);
+ }
+
+ @Override
+ public Map<String, IQueue<Record<?>>> createShuffles(HazelcastInstance
hazelcast, int inputIndex) {
+ checkArgument(inputIndex >= 0 && inputIndex <
getInputPartitions());
+ Map<String, IQueue<Record<?>>> shuffleMap = new HashMap<>();
+ for (int targetIndex = 0; targetIndex < getTargetPartitions();
targetIndex++) {
+ String queueName = generateQueueName(inputIndex, targetIndex);
+ IQueue<Record<?>> queue = getIQueue(hazelcast, queueName);
+ shuffleMap.put(queueName, queue);
+ }
+ return shuffleMap;
+ }
+
+ @Override
+ public String createShuffleKey(Record<?> record, int inputIndex) {
+ String[] inputQueueNames =
inputQueueMapping.computeIfAbsent(inputIndex, key -> {
+ String[] queueNames = new String[getTargetPartitions()];
+ for (int targetIndex = 0; targetIndex < getTargetPartitions();
targetIndex++) {
+ queueNames[targetIndex] = generateQueueName(key,
targetIndex);
+ }
+ return queueNames;
+ });
+ return
inputQueueNames[ThreadLocalRandom.current().nextInt(getTargetPartitions())];
+ }
+
+ @Override
+ public IQueue<Record<?>>[] getShuffles(HazelcastInstance hazelcast,
int targetIndex) {
+ checkArgument(targetIndex >= 0 && targetIndex <
getTargetPartitions());
+ IQueue<Record<?>>[] shuffles = new IQueue[getInputPartitions()];
+ for (int inputIndex = 0; inputIndex < getInputPartitions();
inputIndex++) {
+ String queueName = generateQueueName(inputIndex, targetIndex);
+ shuffles[inputIndex] = getIQueue(hazelcast, queueName);
+ }
+ return shuffles;
+ }
+
+ private String generateQueueName(int inputIndex, int targetIndex) {
+ return String.format("PartitionShuffle-Queue[%s-%s-%s]",
getJobId(), inputIndex, targetIndex);
Review Comment:
Should use jobId and PipelineId
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -174,40 +176,29 @@ public void init(long initializationTimestamp) {
flakeIdGenerator,
runningJobStateIMap,
runningJobStateTimestampsIMap,
- engineConfig.getQueueType());
+ engineConfig);
this.physicalPlan = planTuple.f0();
this.physicalPlan.setJobMaster(this);
this.checkpointPlanMap = planTuple.f1();
this.initStateFuture();
}
public void initCheckPointManager() throws CheckpointStorageException {
- CheckpointConfig checkpointConfig =
mergeEnvAndEngineConfig(engineConfig.getCheckpointConfig(),
- jobImmutableInformation.getJobConfig().getEnvOptions());
this.checkpointManager = new CheckpointManager(
jobImmutableInformation.getJobId(),
jobImmutableInformation.isStartWithSavePoint(),
nodeEngine,
this,
checkpointPlanMap,
- checkpointConfig);
+ engineConfig.getCheckpointConfig());
}
// TODO replace it after ReadableConfig Support parse yaml format, then
use only one config to read engine and env config.
- private CheckpointConfig mergeEnvAndEngineConfig(CheckpointConfig engine,
Map<String, Object> env) {
- CheckpointConfig checkpointConfig = new CheckpointConfig();
+ private CheckpointConfig mergeEnvAndEngineConfig(CheckpointConfig
engineConfig, Map<String, Object> env) {
if (env.containsKey(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
- checkpointConfig.setCheckpointInterval((Integer)
env.get(EnvCommonOptions.CHECKPOINT_INTERVAL.key()));
+ engineConfig.setCheckpointInterval((Integer)
env.get(EnvCommonOptions.CHECKPOINT_INTERVAL.key()));
}
- checkpointConfig.setCheckpointTimeout(engine.getCheckpointTimeout());
Review Comment:
Why remove checkpoint config for timeout and others?
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java:
##########
@@ -463,13 +462,6 @@ private void setFlowConfig(Flow f, int parallelismIndex) {
config.setCommitterTask(committerTaskIDMap.get((SinkAction<?, ?, ?, ?>)
flow.getAction()));
}
flow.setConfig(config);
- } else if (flow.getAction() instanceof ShuffleAction) {
Review Comment:
Where parse executionPlan about shuffle logic to phyiscalPlan ?
##########
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleConfig.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.dag.actions;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.seatunnel.api.table.type.MultipleRowType;
+import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import com.hazelcast.collection.IQueue;
+import com.hazelcast.config.QueueConfig;
+import com.hazelcast.core.HazelcastInstance;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+@Getter
+@Setter
+@Accessors(chain = true)
+@ToString
+@NoArgsConstructor
+public class ShuffleConfig implements Config {
+ public static final int DEFAULT_QUEUE_SIZE = 2048;
+ public static final int DEFAULT_QUEUE_BACKUP_COUNT = 0;
+ public static final int DEFAULT_QUEUE_ASYNC_BACKUP_COUNT = 0;
+ public static final int DEFAULT_BATCH_SIZE = 1024;
+ public static final long DEFAULT_BATCH_FLUSH_INTERVAL =
TimeUnit.SECONDS.toMillis(3);
+
+ private int batchSize;
+ private long batchFlushInterval;
+ private ShuffleStrategy shuffleStrategy;
+
+ @Getter
+ @Setter
+ @AllArgsConstructor
+ @NoArgsConstructor
+ @ToString
+ public abstract static class ShuffleStrategy implements Serializable {
+ private long jobId;
+ private int inputPartitions;
+ private int targetPartitions;
+ private QueueConfig queueConfig;
+
+ public abstract Map<String, IQueue<Record<?>>>
createShuffles(HazelcastInstance hazelcast, int inputIndex);
+
+ public abstract String createShuffleKey(Record<?> record, int
inputIndex);
+
+ public abstract IQueue<Record<?>>[] getShuffles(HazelcastInstance
hazelcast, int targetIndex);
+
+ protected IQueue<Record<?>> getIQueue(HazelcastInstance hazelcast,
String queueName) {
+ QueueConfig targetQueueConfig =
hazelcast.getConfig().getQueueConfig(queueName);
+ if (queueConfig != null) {
+ targetQueueConfig.setMaxSize(queueConfig.getMaxSize())
+ .setBackupCount(queueConfig.getBackupCount())
+ .setAsyncBackupCount(queueConfig.getAsyncBackupCount())
+ .setEmptyQueueTtl(queueConfig.getEmptyQueueTtl());
+ }
+ return hazelcast.getQueue(queueName);
+ }
+ }
+
+ @NoArgsConstructor
+ @ToString
+ public static class PartitionShuffleStrategy extends ShuffleStrategy {
+ private final Map<Integer, String[]> inputQueueMapping = new
HashMap<>();
+
+ public PartitionShuffleStrategy(long jobId,
+ int inputPartitions,
+ int targetPartitions) {
+ this(jobId, inputPartitions, targetPartitions, null);
+ }
+
+ public PartitionShuffleStrategy(long jobId,
+ int inputPartitions,
+ int targetPartitions,
+ QueueConfig queueConfig) {
+ super(jobId, inputPartitions, targetPartitions, queueConfig);
+ }
+
+ @Override
+ public Map<String, IQueue<Record<?>>> createShuffles(HazelcastInstance
hazelcast, int inputIndex) {
+ checkArgument(inputIndex >= 0 && inputIndex <
getInputPartitions());
+ Map<String, IQueue<Record<?>>> shuffleMap = new HashMap<>();
+ for (int targetIndex = 0; targetIndex < getTargetPartitions();
targetIndex++) {
+ String queueName = generateQueueName(inputIndex, targetIndex);
+ IQueue<Record<?>> queue = getIQueue(hazelcast, queueName);
+ shuffleMap.put(queueName, queue);
Review Comment:
Maybe should call `clear` before put it into map
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]