[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1164072696 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/upstream/common/TierWriter.java: ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.upstream.common; + +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TierType; + +import java.io.IOException; + +/** + * The single storage tier writer for the tiered store. The storage tier writers is independent of + * each other. Through the {@link TierWriter}, we can create {@link TierStorage} to store shuffle + * data. + */ +public interface TierWriter { + +void setup() throws IOException; + +/** Create the {@link TierStorage} of the {@link TierWriter} to write shuffle data. */ +TierStorage createPartitionTierStorage(); Review Comment: Fixed the issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1161498735 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.tieredstore.upstream; + +/** The tiered store mode for {@link TieredStoreResultPartition}. */ +public interface TieredStoreMode { + +/** + * The shuffle records will be accumulated to the finished buffer before writing to the + * corresponding tier and the accumulator is the tier of {@link TierType#IN_CACHE}. + */ +enum TierType implements TieredStoreMode { +IN_CACHE, +IN_MEM, +IN_DISK, +IN_REMOTE, Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1161502620 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreProducerImpl.java: ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.tieredstore.upstream; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; +import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.cache.BufferAccumulator; +import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.cache.BufferAccumulatorImpl; +import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.StorageTier; +import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.TieredStoreProducer; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * This is a common entrypoint of emitting records to tiered store. These records will be emitted to + * the {@link BufferAccumulator} to accumulate and transform into finished buffers. + */ +public class TieredStoreProducerImpl implements TieredStoreProducer { + +private final boolean isBroadcastOnly; + +private final int numSubpartitions; + +private final BufferAccumulator bufferAccumulator; + +public TieredStoreProducerImpl( +StorageTier[] storageTiers, +int numSubpartitions, +int bufferSize, +boolean isBroadcastOnly, +@Nullable BufferCompressor bufferCompressor) { +this.isBroadcastOnly = isBroadcastOnly; +this.numSubpartitions = numSubpartitions; + +this.bufferAccumulator = new BufferAccumulatorImpl(); Review Comment: I have updated the producer constructor and made this a constructor argument. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1180122783 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java: ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; +import org.apache.flink.util.ExceptionUtils; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +/** Client of the Tiered Storage used by the producer. */ +public class TieredStorageProducerClient { +private final boolean isBroadcastOnly; + +private final int numSubpartitions; + +private final BufferAccumulator bufferAccumulator; + +private final BufferCompressor bufferCompressor; + +private final List tierProducerAgents; + +public TieredStorageProducerClient( +int numSubpartitions, +boolean isBroadcastOnly, +BufferAccumulator bufferAccumulator, +@Nullable BufferCompressor bufferCompressor, +List tierProducerAgents) { +this.isBroadcastOnly = isBroadcastOnly; +this.numSubpartitions = numSubpartitions; +this.bufferAccumulator = bufferAccumulator; +this.bufferCompressor = bufferCompressor; +this.tierProducerAgents = tierProducerAgents; + +bufferAccumulator.setup(numSubpartitions, this::writeFinishedBuffers); +} + +public void write( +ByteBuffer record, +TieredStorageSubpartitionId subpartitionId, +Buffer.DataType dataType, +boolean isBroadcast) +throws IOException { + +if (isBroadcast && !isBroadcastOnly) { +for (int i = 0; i < numSubpartitions; ++i) { +bufferAccumulator.receive(record.duplicate(), subpartitionId, dataType); +} +} else { +bufferAccumulator.receive(record, subpartitionId, dataType); +} +} + +public void close() { +bufferAccumulator.close(); +tierProducerAgents.forEach(TierProducerAgent::close); +} + +public void writeFinishedBuffers( Review Comment: Renamed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1180092886 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageAbstractId.java: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; + +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.bytesToHexString; +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.randomBytes; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** The abstract unique identification for the Tiered Storage. */ +public class TieredStorageAbstractId implements TieredStorageDataIdentifier, Serializable { + +private static final long serialVersionUID = -948472905048472823L; + +/** ID represented by a byte array. */ +protected final byte[] id; + +/** Pre-calculated hash-code for acceleration. */ +protected final int hashCode; + +public TieredStorageAbstractId(byte[] id) { +checkArgument(id != null, "Must be not null."); + +this.id = id; +this.hashCode = Arrays.hashCode(id); +} + +public TieredStorageAbstractId(int length) { +checkArgument(length > 0, "Must be positive."); + +this.id = randomBytes(length); +this.hashCode = Arrays.hashCode(id); +} + +public byte[] getId() { Review Comment: Renamed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1161502932 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common; + +import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; + +import java.io.IOException; + +/** + * The single storage tier for the tiered store. The storage tiers is independent of each other. + * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and + * create the {@link TierReaderView} to consume shuffle data. + */ +public interface StorageTier { + +void setup() throws IOException; + +/** Create the {@link TierWriter} of the {@link StorageTier} to write shuffle data. */ +TierWriter createPartitionTierWriter(); + +/** Create the {@link TierReaderView} of the {@link StorageTier} to read shuffle data. */ +TierReaderView createTierReaderView( +int subpartitionId, BufferAvailabilityListener availabilityListener) throws IOException; + +/** + * Before start a new segment, the writer side calls the method to check whether the {@link + * StorageTier} can store the next segment. + */ +boolean canStoreNextSegment(int subpartitionId); + +/** + * Before reading the segment data, the consumer calls the method to check whether the {@link + * StorageTier} has the segment. + */ +boolean hasCurrentSegment(int subpartitionId, int segmentIndex); + +/** Sets the metric group. */ +void setOutputMetrics(OutputMetrics tieredStoreOutputMetrics); + +void close(); + +void release(); Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1161502777 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.tieredstore.upstream; + +/** The tiered store mode for {@link TieredStoreResultPartition}. */ +public interface TieredStoreMode { + +/** + * The shuffle records will be accumulated to the finished buffer before writing to the + * corresponding tier and the accumulator is the tier of {@link TierType#IN_CACHE}. + */ +enum TierType implements TieredStoreMode { +IN_CACHE, +IN_MEM, +IN_DISK, +IN_REMOTE, +} + +/** + * Currently, only these tier combinations are supported. If the configured tiers is contained + * in the following combinations, an exception will be thrown. + */ +enum SupportedTierCombinations implements TieredStoreMode { +MEMORY, +MEMORY_DISK, +MEMORY_DISK_REMOTE, Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1161498829 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common; + +import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; + +import java.io.IOException; + +/** + * The single storage tier for the tiered store. The storage tiers is independent of each other. + * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and + * create the {@link TierReaderView} to consume shuffle data. + */ +public interface StorageTier { + +void setup() throws IOException; Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1187058942 ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TieredStorageTestUtils.java: ## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered; + +/** The test utils for the tiered storage tests. */ +public class TieredStorageTestUtils { Review Comment: Removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1187058942 ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TieredStorageTestUtils.java: ## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered; + +/** The test utils for the tiered storage tests. */ +public class TieredStorageTestUtils { Review Comment: Removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1187058705 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java: ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageConfiguration; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.ResourceRegistry; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMasterClient; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils.convertId; + +/** + * A wrapper internal shuffle master class for tiered storage. All the tiered storage operations + * with the shuffle master should be wrapped in this class. + */ +public class TieredInternalShuffleMaster { + +private final TieredStorageMasterClient tieredStorageMasterClient; + +private final Map> jobPartitionIds; + +public TieredInternalShuffleMaster(Configuration conf) { +TieredStorageConfiguration tieredStorageConfiguration = +createTieredStorageConfiguration(conf); +ResourceRegistry resourceRegistry = new ResourceRegistry(); +List tierFactories = +tieredStorageConfiguration.getTierFactories().stream() +.map(TierFactory::createMasterAgent) +.peek(tierMasterAgent -> tierMasterAgent.setup(resourceRegistry)) +.collect(Collectors.toList()); +this.tieredStorageMasterClient = new TieredStorageMasterClient(tierFactories); +this.jobPartitionIds = new HashMap<>(); +} + +public void addPartition(JobID jobID, ResultPartitionID resultPartitionID) { +jobPartitionIds.computeIfAbsent(jobID, ignore -> new ArrayList<>()).add(resultPartitionID); +tieredStorageMasterClient.addPartition(convertId(resultPartitionID)); +} + +public void releasePartition(ResultPartitionID resultPartitionID) { + tieredStorageMasterClient.releasePartition(convertId(resultPartitionID)); +} + +public void unregisterJob(JobID jobID) { +List resultPartitionIDs = jobPartitionIds.remove(jobID); +if (resultPartitionIDs != null) { +resultPartitionIDs.forEach( +resultPartitionID -> +tieredStorageMasterClient.releasePartition( +convertId(resultPartitionID))); +} +} + +private TieredStorageConfiguration createTieredStorageConfiguration(Configuration conf) { +// TODO, from the configuration, get the configured options(i.e., remote storage path, the +// reserved storage size, etc.), then set them to the builder. Review Comment: Replaced this with a `TieredStorageConfiguration#fromConfiguration`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1187058204 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java: ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageConfiguration; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.ResourceRegistry; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMasterClient; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils.convertId; + +/** + * A wrapper internal shuffle master class for tiered storage. All the tiered storage operations + * with the shuffle master should be wrapped in this class. + */ +public class TieredInternalShuffleMaster { + +private final TieredStorageMasterClient tieredStorageMasterClient; + +private final Map> jobPartitionIds; + +public TieredInternalShuffleMaster(Configuration conf) { +TieredStorageConfiguration tieredStorageConfiguration = +createTieredStorageConfiguration(conf); +ResourceRegistry resourceRegistry = new ResourceRegistry(); +List tierFactories = +tieredStorageConfiguration.getTierFactories().stream() +.map(TierFactory::createMasterAgent) +.peek(tierMasterAgent -> tierMasterAgent.setup(resourceRegistry)) +.collect(Collectors.toList()); +this.tieredStorageMasterClient = new TieredStorageMasterClient(tierFactories); +this.jobPartitionIds = new HashMap<>(); +} + +public void addPartition(JobID jobID, ResultPartitionID resultPartitionID) { +jobPartitionIds.computeIfAbsent(jobID, ignore -> new ArrayList<>()).add(resultPartitionID); +tieredStorageMasterClient.addPartition(convertId(resultPartitionID)); +} + +public void releasePartition(ResultPartitionID resultPartitionID) { + tieredStorageMasterClient.releasePartition(convertId(resultPartitionID)); +} + +public void unregisterJob(JobID jobID) { +List resultPartitionIDs = jobPartitionIds.remove(jobID); +if (resultPartitionIDs != null) { +resultPartitionIDs.forEach( +resultPartitionID -> +tieredStorageMasterClient.releasePartition( +convertId(resultPartitionID))); +} +} Review Comment: OK. Because we can not know the exact jobId when releasing a result partition, I added a `partitionJobIds` to record the jobId of the `resultPartitioinId` to be released. 1. When adding a partition, add the partition id and job id to `jobPartitionIds` and `partitionJobIds` separately. 2. When releasing a partition, remove the partition id in the `jobPartitionIds` and `partitionJobIds` separately. 3. When registering a job, remove all the partition ids of the job in the `jobPartitionIds` and `partitionJobIds`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1187054589 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java: ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageConfiguration; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.ResourceRegistry; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMasterClient; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils.convertId; + +/** + * A wrapper internal shuffle master class for tiered storage. All the tiered storage operations + * with the shuffle master should be wrapped in this class. + */ +public class TieredInternalShuffleMaster { + +private final TieredStorageMasterClient tieredStorageMasterClient; + +private final Map> jobPartitionIds; + +public TieredInternalShuffleMaster(Configuration conf) { +TieredStorageConfiguration tieredStorageConfiguration = +createTieredStorageConfiguration(conf); +ResourceRegistry resourceRegistry = new ResourceRegistry(); +List tierFactories = +tieredStorageConfiguration.getTierFactories().stream() +.map(TierFactory::createMasterAgent) +.peek(tierMasterAgent -> tierMasterAgent.setup(resourceRegistry)) Review Comment: Fixed it. Added an argument for the `TierFactory#createMasterAgent` to avoid the `setup` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1187053970 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredResource.java: ## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +/** The resource (e.g., local files, remote storage files, etc.) for the Tiered Storage. */ +public interface TieredResource { Review Comment: Renamed it. In addition, I also renamed the `ResourceRegistry` to `TieredStorageResourceRegistry`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1187053105 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java: ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common; + +/** Configurations for the Tiered Storage. */ +public class TieredStorageConfiguration { Review Comment: Fixed. I have moved the internal types and internal methods to the tail of the class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1187052528 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java: ## @@ -23,6 +23,9 @@ import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; Review Comment: OK, we can remove the dependcies with the follow-up ticket. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1180124978 ## flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java: ## @@ -49,6 +57,10 @@ public class NettyShuffleMaster implements ShuffleMaster private final int networkBufferSize; +private final ResourceRegistry resourceRegistry; Review Comment: Added a `TieredInternalShuffleMaster` wrapper class. All the tiered storage operations with the shuffle master should be wrapped in this class. The `ResourceRegistry ` is changed as a variable in the constructor of `TieredInternalShuffleMaster`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1180122783 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java: ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; +import org.apache.flink.util.ExceptionUtils; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +/** Client of the Tiered Storage used by the producer. */ +public class TieredStorageProducerClient { +private final boolean isBroadcastOnly; + +private final int numSubpartitions; + +private final BufferAccumulator bufferAccumulator; + +private final BufferCompressor bufferCompressor; + +private final List tierProducerAgents; + +public TieredStorageProducerClient( +int numSubpartitions, +boolean isBroadcastOnly, +BufferAccumulator bufferAccumulator, +@Nullable BufferCompressor bufferCompressor, +List tierProducerAgents) { +this.isBroadcastOnly = isBroadcastOnly; +this.numSubpartitions = numSubpartitions; +this.bufferAccumulator = bufferAccumulator; +this.bufferCompressor = bufferCompressor; +this.tierProducerAgents = tierProducerAgents; + +bufferAccumulator.setup(numSubpartitions, this::writeFinishedBuffers); +} + +public void write( +ByteBuffer record, +TieredStorageSubpartitionId subpartitionId, +Buffer.DataType dataType, +boolean isBroadcast) +throws IOException { + +if (isBroadcast && !isBroadcastOnly) { +for (int i = 0; i < numSubpartitions; ++i) { +bufferAccumulator.receive(record.duplicate(), subpartitionId, dataType); +} +} else { +bufferAccumulator.receive(record, subpartitionId, dataType); +} +} + +public void close() { +bufferAccumulator.close(); +tierProducerAgents.forEach(TierProducerAgent::close); +} + +public void writeFinishedBuffers( Review Comment: Renemed it. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java: ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; +import org.apache.flink.util.ExceptionUtils; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +/** Client of the Tie
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1180122535 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java: ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; +import org.apache.flink.util.ExceptionUtils; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +/** Client of the Tiered Storage used by the producer. */ +public class TieredStorageProducerClient { +private final boolean isBroadcastOnly; + +private final int numSubpartitions; + +private final BufferAccumulator bufferAccumulator; + +private final BufferCompressor bufferCompressor; + +private final List tierProducerAgents; + +public TieredStorageProducerClient( +int numSubpartitions, +boolean isBroadcastOnly, +BufferAccumulator bufferAccumulator, +@Nullable BufferCompressor bufferCompressor, +List tierProducerAgents) { +this.isBroadcastOnly = isBroadcastOnly; +this.numSubpartitions = numSubpartitions; +this.bufferAccumulator = bufferAccumulator; +this.bufferCompressor = bufferCompressor; +this.tierProducerAgents = tierProducerAgents; + +bufferAccumulator.setup(numSubpartitions, this::writeFinishedBuffers); +} + +public void write( +ByteBuffer record, +TieredStorageSubpartitionId subpartitionId, +Buffer.DataType dataType, +boolean isBroadcast) +throws IOException { + +if (isBroadcast && !isBroadcastOnly) { Review Comment: Added a doc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1180122114 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageJobId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Client of the Tiered Storage used by the master. */ +public class TieredStorageMasterClient { + +private final List tiers; + +private final Map> jobPartitionIds; + +private final ResourceRegistry resourceRegistry; + +public TieredStorageMasterClient( +List tiers, ResourceRegistry resourceRegistry) { +this.tiers = tiers; +this.resourceRegistry = resourceRegistry; +this.jobPartitionIds = new HashMap<>(); +} + +public void registerResource(TieredStorageJobId jobId, TieredStoragePartitionId partitionId) { +jobPartitionIds.computeIfAbsent(jobId, ignore -> new ArrayList<>()).add(partitionId); +resourceRegistry.registerResource( +partitionId, () -> tiers.forEach(TierMasterAgent::release)); Review Comment: These methods have been renamed into `addPartition` and `releasePartition`. Each tier itself will decide what to do, i.e., register or release resources. (These codes are not added now, because they will be added when implementing each tier.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1180118466 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageJobId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Client of the Tiered Storage used by the master. */ +public class TieredStorageMasterClient { + +private final List tiers; + +private final Map> jobPartitionIds; Review Comment: OK, good point. Added a `TieredInternalShuffleMaster` wrapper class. All the tiered storage operations with the shuffle master should be wrapped in this class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1180116664 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageJobId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Client of the Tiered Storage used by the master. */ +public class TieredStorageMasterClient { + +private final List tiers; + +private final Map> jobPartitionIds; + +private final ResourceRegistry resourceRegistry; + +public TieredStorageMasterClient( +List tiers, ResourceRegistry resourceRegistry) { +this.tiers = tiers; +this.resourceRegistry = resourceRegistry; +this.jobPartitionIds = new HashMap<>(); +} + +public void registerResource(TieredStorageJobId jobId, TieredStoragePartitionId partitionId) { Review Comment: This need not be a topic id, because the result partition id is unique. The reason why the result partition id is unique is that there is `IntermediateResultPartitionID` in its own field and this `IntermediateResultPartitionID` has included the `IntermediateDataSetID` information naturally. In addition, these methods have been renamed into `addPartition` and `releasePartition`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1180110166 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java: ## @@ -29,6 +34,18 @@ public class TieredStorageUtils { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' }; +public static List getTierFactoriesFromConfiguration() { +TieredStorageConfiguration tieredStorageConfiguration = +TieredStorageConfiguration.builder() + .setTierTypes(TieredStorageConfiguration.memoryDiskTierTypes()) +.build(); Review Comment: Removed the `getTieredFactories` in `TieredStorageUtis`. In addition, the `TieredStorageUtis` is useless now, so I also removed it. Now we can get the factories with `TieredStorageConfiguration#getTierFactories`. Removed the `setTierTypes`, because in the first version, we need not set the tier types from outside of the configuration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1180104030 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common; + +/** Configurations for the Tiered Storage. */ +public class TieredStorageConfiguration { + +private enum TierType { +IN_MEM, +IN_DISK, +IN_REMOTE, +} + +private static final TierType[] DEFAULT_MEMORY_DISK_TIER_TYPES = +new TierType[] {TierType.IN_MEM, TierType.IN_DISK}; + +private final TierType[] tierTypes; + +private TieredStorageConfiguration(TierType[] tierTypes) { +this.tierTypes = tierTypes; +} + +public int[] getTierIndexes() { +int[] tierIndexes = new int[tierTypes.length]; +for (int i = 0; i < tierTypes.length; i++) { +tierIndexes[i] = i; +} +return tierIndexes; +} + +public static TierType[] memoryDiskTierTypes() { Review Comment: Removed it. In the first version, only the default tier type is supported, so I removed `memoryDiskTierTypes `. Currently, the default value is a fixed value. So I changed `TieredStorageConfiguration#getDefaultTierTypes` into a private method, and get the tierTypes when building the configuration. There is a TODO here, the `getDefaultTierTypes` should add a new argument in the future. And this method will be like this after adding the option of remote storage path. ``` private static TierType[] getDefaultTierTypes(String remoteStorageHomePath) { return remoteStorageHomePath == null ? DEFAULT_MEMORY_DISK_TIER_TYPES : DEFAULT_MEMORY_DISK_REMOTE_TIER_TYPES; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1180097369 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageIdMappingUtils.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.util.AbstractID; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; + +/** Utils to convert the Ids to Tiered Storage Ids, or vice versa. */ +public class TieredStorageIdMappingUtils { + +public static TieredStorageJobId convertId(JobID jobID) { +return new TieredStorageJobId(jobID.getBytes()); +} + +public static JobID convertId(TieredStorageJobId tieredStorageJobId) { +return new JobID(tieredStorageJobId.getId()); +} + +public static TieredStorageTopicId convertId(IntermediateDataSetID intermediateDataSetID) { +return new TieredStorageTopicId(intermediateDataSetID.getBytes()); +} + +public static IntermediateDataSetID convertId(TieredStorageTopicId topicId) { +return new IntermediateDataSetID(new AbstractID(topicId.getId())); +} + +public static TieredStoragePartitionId convertId(ResultPartitionID resultPartitionId) { +ByteBuf byteBuf = Unpooled.buffer(); Review Comment: Resolved it. Add a `getBytes` method in the `ResultPartitionId`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1180096713 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common; + +/** Configurations for the Tiered Storage. */ +public class TieredStorageConfiguration { + +private enum TierType { +IN_MEM, +IN_DISK, +IN_REMOTE, +} + +private static final TierType[] DEFAULT_MEMORY_DISK_TIER_TYPES = +new TierType[] {TierType.IN_MEM, TierType.IN_DISK}; + +private final TierType[] tierTypes; + +private TieredStorageConfiguration(TierType[] tierTypes) { +this.tierTypes = tierTypes; +} + +public int[] getTierIndexes() { +int[] tierIndexes = new int[tierTypes.length]; +for (int i = 0; i < tierTypes.length; i++) { +tierIndexes[i] = i; +} +return tierIndexes; +} Review Comment: Removed it. Because we can use a `TieredStorageConfiguration#getTierFactories` to get the factories. So the method is useless. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1180094931 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageJobId.java: ## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common; + +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.bytesToHexString; + +/** + * Identifier of a job. + * + * A job is equivalent to a job in Flink. + */ +public class TieredStorageJobId extends TieredStorageAbstractId { Review Comment: Removed it. Now I use a partition id directly to release the resource because the partition id is unique. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common; + +/** Configurations for the Tiered Storage. */ +public class TieredStorageConfiguration { + +private enum TierType { +IN_MEM, +IN_DISK, +IN_REMOTE, +} Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1180093470 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java: ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common; + +import java.util.Random; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** Utils for reading or writing to tiered store. */ +public class TieredStorageUtils { + +private static final char[] HEX_CHARS = { +'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' +}; + +public static byte[] randomBytes(int length) { Review Comment: Removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1180092886 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageAbstractId.java: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; + +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.bytesToHexString; +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.randomBytes; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** The abstract unique identification for the Tiered Storage. */ +public class TieredStorageAbstractId implements TieredStorageDataIdentifier, Serializable { + +private static final long serialVersionUID = -948472905048472823L; + +/** ID represented by a byte array. */ +protected final byte[] id; + +/** Pre-calculated hash-code for acceleration. */ +protected final int hashCode; + +public TieredStorageAbstractId(byte[] id) { +checkArgument(id != null, "Must be not null."); + +this.id = id; +this.hashCode = Arrays.hashCode(id); +} + +public TieredStorageAbstractId(int length) { +checkArgument(length > 0, "Must be positive."); + +this.id = randomBytes(length); +this.hashCode = Arrays.hashCode(id); +} + +public byte[] getId() { Review Comment: Renamed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1180092466 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java: ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common; + +import java.util.Random; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** Utils for reading or writing to tiered store. */ +public class TieredStorageUtils { + +private static final String TIER_STORE_DIR = "tiered-store"; + +private static final char[] HEX_CHARS = { +'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' +}; + +public static byte[] randomBytes(int length) { +checkArgument(length > 0, "Must be positive."); + +Random random = new Random(); +byte[] bytes = new byte[length]; +random.nextBytes(bytes); +return bytes; +} + +public static String bytesToHexString(byte[] bytes) { Review Comment: Removed, use `StringUtils#byteToHexString` instead. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageAbstractId.java: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; + +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.bytesToHexString; +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.randomBytes; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** The abstract unique identification for the Tiered Storage. */ +public class TieredStorageAbstractId implements TieredStorageDataIdentifier, Serializable { Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1180092087 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageAbstractId.java: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; + +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.bytesToHexString; +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.randomBytes; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** The abstract unique identification for the Tiered Storage. */ +public class TieredStorageAbstractId implements TieredStorageDataIdentifier, Serializable { + +private static final long serialVersionUID = -948472905048472823L; Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1164073848 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/upstream/common/TierStorage.java: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.upstream.common; + +import org.apache.flink.runtime.io.network.buffer.Buffer; + +import java.io.IOException; + +/** + * This {@link TierStorage} is the specific tier storage. The finished {@link Buffer} will be + * emitted to this {@link TierStorage}. Each tier may contain data of all subpartitions. + */ +public interface TierStorage { + +/** Setups the {@link TierStorage}. */ +void setup() throws IOException; + +/** Emits the finished {@link Buffer} to a specific subpartition. */ +boolean emit( +int targetSubpartition, Buffer finishedBuffer, boolean isEndOfPartition, int segmentId) +throws IOException; + +/** Closes the {@link TierStorage}, the opened channels should be closed. */ +void close(); + +/** Releases the {@link TierStorage}, all resources should be released. */ +void release(); Review Comment: I have updated it. `TierStorageWriter` only has `close` method. `TierStorage` only has `release` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1164072696 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/upstream/common/TierWriter.java: ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.upstream.common; + +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TierType; + +import java.io.IOException; + +/** + * The single storage tier writer for the tiered store. The storage tier writers is independent of + * each other. Through the {@link TierWriter}, we can create {@link TierStorage} to store shuffle + * data. + */ +public interface TierWriter { + +void setup() throws IOException; + +/** Create the {@link TierStorage} of the {@link TierWriter} to write shuffle data. */ +TierStorage createPartitionTierStorage(); Review Comment: Fixed the issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1164072367 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/upstream/common/TierStorage.java: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.upstream.common; + +import org.apache.flink.runtime.io.network.buffer.Buffer; + +import java.io.IOException; + +/** + * This {@link TierStorage} is the specific tier storage. The finished {@link Buffer} will be + * emitted to this {@link TierStorage}. Each tier may contain data of all subpartitions. + */ +public interface TierStorage { + +/** Setups the {@link TierStorage}. */ +void setup() throws IOException; + +/** Emits the finished {@link Buffer} to a specific subpartition. */ +boolean emit( +int targetSubpartition, Buffer finishedBuffer, boolean isEndOfPartition, int segmentId) Review Comment: It has been renamed to `write` in `TierStorageWriter`. Only two arguments `consumerId` and `finishedBuffer` are kept. `isEndOfParition` and `segmentId ` are removed in `write` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1164069146 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TierType.java: ## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered; + +/** The storage types for tiered store. */ +public enum TierType { Review Comment: I have made the TierType as a private enum in the `TieredStoreShuffleEnvironment`. The enum does not indicate that only these 3 types are supported, only for easily adding new tiers if needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1161504837 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common; + +import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; + +import java.io.IOException; + +/** + * The single storage tier for the tiered store. The storage tiers is independent of each other. + * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and + * create the {@link TierReaderView} to consume shuffle data. + */ +public interface StorageTier { + +void setup() throws IOException; + +/** Create the {@link TierWriter} of the {@link StorageTier} to write shuffle data. */ +TierWriter createPartitionTierWriter(); + +/** Create the {@link TierReaderView} of the {@link StorageTier} to read shuffle data. */ +TierReaderView createTierReaderView( +int subpartitionId, BufferAvailabilityListener availabilityListener) throws IOException; + +/** + * Before start a new segment, the writer side calls the method to check whether the {@link + * StorageTier} can store the next segment. + */ +boolean canStoreNextSegment(int subpartitionId); Review Comment: The method has been removed from the API. It will be in the netty-based consumer API in the subsequential change. In that change, we will use the `producerId` instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1161500134 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common; + +import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; + +import java.io.IOException; + +/** + * The single storage tier for the tiered store. The storage tiers is independent of each other. + * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and + * create the {@link TierReaderView} to consume shuffle data. + */ +public interface StorageTier { + +void setup() throws IOException; + +/** Create the {@link TierWriter} of the {@link StorageTier} to write shuffle data. */ +TierWriter createPartitionTierWriter(); + +/** Create the {@link TierReaderView} of the {@link StorageTier} to read shuffle data. */ +TierReaderView createTierReaderView( +int subpartitionId, BufferAvailabilityListener availabilityListener) throws IOException; + +/** + * Before start a new segment, the writer side calls the method to check whether the {@link + * StorageTier} can store the next segment. + */ +boolean canStoreNextSegment(int subpartitionId); + +/** + * Before reading the segment data, the consumer calls the method to check whether the {@link + * StorageTier} has the segment. + */ +boolean hasCurrentSegment(int subpartitionId, int segmentIndex); + +/** Sets the metric group. */ +void setOutputMetrics(OutputMetrics tieredStoreOutputMetrics); Review Comment: Fixed. This can be in the `BufferAccumulator`. It need not be in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1161502932 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common; + +import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; + +import java.io.IOException; + +/** + * The single storage tier for the tiered store. The storage tiers is independent of each other. + * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and + * create the {@link TierReaderView} to consume shuffle data. + */ +public interface StorageTier { + +void setup() throws IOException; + +/** Create the {@link TierWriter} of the {@link StorageTier} to write shuffle data. */ +TierWriter createPartitionTierWriter(); + +/** Create the {@link TierReaderView} of the {@link StorageTier} to read shuffle data. */ +TierReaderView createTierReaderView( +int subpartitionId, BufferAvailabilityListener availabilityListener) throws IOException; + +/** + * Before start a new segment, the writer side calls the method to check whether the {@link + * StorageTier} can store the next segment. + */ +boolean canStoreNextSegment(int subpartitionId); + +/** + * Before reading the segment data, the consumer calls the method to check whether the {@link + * StorageTier} has the segment. + */ +boolean hasCurrentSegment(int subpartitionId, int segmentIndex); + +/** Sets the metric group. */ +void setOutputMetrics(OutputMetrics tieredStoreOutputMetrics); + +void close(); + +void release(); Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1161502777 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.tieredstore.upstream; + +/** The tiered store mode for {@link TieredStoreResultPartition}. */ +public interface TieredStoreMode { + +/** + * The shuffle records will be accumulated to the finished buffer before writing to the + * corresponding tier and the accumulator is the tier of {@link TierType#IN_CACHE}. + */ +enum TierType implements TieredStoreMode { +IN_CACHE, +IN_MEM, +IN_DISK, +IN_REMOTE, +} + +/** + * Currently, only these tier combinations are supported. If the configured tiers is contained + * in the following combinations, an exception will be thrown. + */ +enum SupportedTierCombinations implements TieredStoreMode { +MEMORY, +MEMORY_DISK, +MEMORY_DISK_REMOTE, Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1161502620 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreProducerImpl.java: ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.tieredstore.upstream; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; +import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.cache.BufferAccumulator; +import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.cache.BufferAccumulatorImpl; +import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.StorageTier; +import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.TieredStoreProducer; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * This is a common entrypoint of emitting records to tiered store. These records will be emitted to + * the {@link BufferAccumulator} to accumulate and transform into finished buffers. + */ +public class TieredStoreProducerImpl implements TieredStoreProducer { + +private final boolean isBroadcastOnly; + +private final int numSubpartitions; + +private final BufferAccumulator bufferAccumulator; + +public TieredStoreProducerImpl( +StorageTier[] storageTiers, +int numSubpartitions, +int bufferSize, +boolean isBroadcastOnly, +@Nullable BufferCompressor bufferCompressor) { +this.isBroadcastOnly = isBroadcastOnly; +this.numSubpartitions = numSubpartitions; + +this.bufferAccumulator = new BufferAccumulatorImpl(); Review Comment: I have updated the producer constructor and made this a constructor argument. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1161502026 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreResultPartition.java: ## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.tieredstore.upstream; + +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; +import org.apache.flink.runtime.event.AbstractEvent; +import org.apache.flink.runtime.io.network.api.EndOfData; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.api.StopMode; +import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; +import org.apache.flink.runtime.io.network.partition.ChannelStateHolder; +import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; +import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.StorageTier; +import org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common.TieredStoreProducer; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; +import org.apache.flink.util.concurrent.FutureUtils; +import org.apache.flink.util.function.SupplierWithException; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * {@link TieredStoreResultPartition} appends records and events to the tiered store, which supports + * the upstream dynamically switches storage tier for writing shuffle data, and the downstream will + * read data from the relevant storage tier. + */ +public class TieredStoreResultPartition extends ResultPartition implements ChannelStateHolder { Review Comment: This is a typo, fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1161501622 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreConfiguration.java: ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.tieredstore.upstream; + +/** The configuration for tiered store. */ +public class TieredStoreConfiguration { + +private final String tieredStoreTiers; Review Comment: This change should be contained in the PR. I have removed it. I will use an explicit type when submitting the new PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1161500942 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/MemorySegmentAndChannel.java: ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.buffer.Buffer; + +/** {@link MemorySegment} info and the corresponding channel index. */ +public class MemorySegmentAndChannel { Review Comment: This is a class used in `BufferAccumulator`. The finished memory segment(without buffer recycler) should be emitted to each tier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1161500038 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common; + +import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; + +import java.io.IOException; + +/** + * The single storage tier for the tiered store. The storage tiers is independent of each other. + * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and + * create the {@link TierReaderView} to consume shuffle data. + */ +public interface StorageTier { + +void setup() throws IOException; + +/** Create the {@link TierWriter} of the {@link StorageTier} to write shuffle data. */ +TierWriter createPartitionTierWriter(); + +/** Create the {@link TierReaderView} of the {@link StorageTier} to read shuffle data. */ +TierReaderView createTierReaderView( +int subpartitionId, BufferAvailabilityListener availabilityListener) throws IOException; + +/** + * Before start a new segment, the writer side calls the method to check whether the {@link + * StorageTier} can store the next segment. + */ +boolean canStoreNextSegment(int subpartitionId); + +/** + * Before reading the segment data, the consumer calls the method to check whether the {@link + * StorageTier} has the segment. + */ +boolean hasCurrentSegment(int subpartitionId, int segmentIndex); Review Comment: The method is removed from the API. The method will be in netty-based upstream consumer API in the subsequential PR. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common; + +import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; + +import java.io.IOException; + +/** + * The single storage tier for the tiered store. The storage tiers is independent of each other. + * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and + * create the {@link TierReaderView} to consume shuffle data. + */ +public interface StorageTier { + +void setup() throws IOException; + +/** Create the {@link TierWriter} of the {@link StorageTier} to write shuffle data. */ +TierWriter createPartitionTierWriter(); + +/** Create the {@link TierReaderView} of the {@link StorageTier} to read shuffle data. */ +TierReaderView createTierReaderView( +int subpartitionId, BufferAvailabilityListener availabilityListener) throws IOException; + +/** + * Before start a new segment, the writer side calls the method to check whether the {@link + * StorageTier} can store the next segment. + */ +boolean canStoreNextSegment(int subpartitionId); + +/** + * Before reading the segment data, the consumer calls the method to check whether the {@link + * StorageTier} has the segment. + */ +boolean hasCurrentSegment(int subpartitionId, int segmentIndex); + +/** Sets th
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1161499623 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common; + +import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; + +import java.io.IOException; + +/** + * The single storage tier for the tiered store. The storage tiers is independent of each other. + * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and + * create the {@link TierReaderView} to consume shuffle data. + */ +public interface StorageTier { + +void setup() throws IOException; + +/** Create the {@link TierWriter} of the {@link StorageTier} to write shuffle data. */ +TierWriter createPartitionTierWriter(); + +/** Create the {@link TierReaderView} of the {@link StorageTier} to read shuffle data. */ +TierReaderView createTierReaderView( +int subpartitionId, BufferAvailabilityListener availabilityListener) throws IOException; Review Comment: Ok, it is removed from the API. The method will be in netty-based upstream consumer API in the subsequential PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1161498735 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.tieredstore.upstream; + +/** The tiered store mode for {@link TieredStoreResultPartition}. */ +public interface TieredStoreMode { + +/** + * The shuffle records will be accumulated to the finished buffer before writing to the + * corresponding tier and the accumulator is the tier of {@link TierType#IN_CACHE}. + */ +enum TierType implements TieredStoreMode { +IN_CACHE, +IN_MEM, +IN_DISK, +IN_REMOTE, Review Comment: Fixed. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/common/StorageTier.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.tieredstore.upstream.common; + +import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; + +import java.io.IOException; + +/** + * The single storage tier for the tiered store. The storage tiers is independent of each other. + * Through the {@link StorageTier}, we can create {@link TierWriter} to write shuffle data and + * create the {@link TierReaderView} to consume shuffle data. + */ +public interface StorageTier { + +void setup() throws IOException; Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1161498489 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.tieredstore.upstream; + +/** The tiered store mode for {@link TieredStoreResultPartition}. */ +public interface TieredStoreMode { + +/** + * The shuffle records will be accumulated to the finished buffer before writing to the + * corresponding tier and the accumulator is the tier of {@link TierType#IN_CACHE}. + */ +enum TierType implements TieredStoreMode { +IN_CACHE, +IN_MEM, +IN_DISK, +IN_REMOTE, +} + +/** + * Currently, only these tier combinations are supported. If the configured tiers is contained + * in the following combinations, an exception will be thrown. + */ +enum SupportedTierCombinations implements TieredStoreMode { +MEMORY, +MEMORY_DISK, +MEMORY_DISK_REMOTE, +} Review Comment: `SupportedTierCombinations ` is removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1161498297 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.tieredstore.upstream; + +/** The tiered store mode for {@link TieredStoreResultPartition}. */ +public interface TieredStoreMode { + +/** + * The shuffle records will be accumulated to the finished buffer before writing to the + * corresponding tier and the accumulator is the tier of {@link TierType#IN_CACHE}. Review Comment: `IN_CACHE` is removed from `TierType`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1161498092 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.tieredstore.upstream; + +/** The tiered store mode for {@link TieredStoreResultPartition}. */ +public interface TieredStoreMode { Review Comment: `TieredStoreMode ` is removed, we use `TierType` in `o.a.f.r.io.network.partition.tieredstore`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture
TanYuxin-tyx commented on code in PR #22330: URL: https://github.com/apache/flink/pull/22330#discussion_r1161497423 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/tieredstore/upstream/TieredStoreMode.java: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.tieredstore.upstream; Review Comment: I have moved these classes to `o.a.f.r.i.n.partition.hybrid.tiered`. The `TieredStoreMode` is removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org