Kafka testcontainer wrapper

Обёртка для testcontainers Kafka. Умеет читать из контейнера, писать в контейнер и очищать топик.

import lombok.Getter;
import lombok.SneakyThrows;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.testcontainers.containers.Container.ExecResult;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;

public class KafkaContainerWrapper {

    private static final String IMAGE = "confluentinc/cp-kafka";
    private static final String DELETE_MESSAGES = "{\"partitions\": [{\"topic\": \"%TOPIC%\", \"partition\": 0, \"offset\": -1}], \"version\": 1}";
    private static final String DELETE_MESSAGES_FILE_PATH = "./delete-messages.json";
    private static final String CONTAINER_INTERNAL_BOOTSTRAP = "localhost:9092";
    @Getter
    private final String topic;
    @Getter
    private final KafkaContainer container;

    public KafkaContainerWrapper(String topic) {
        this.topic = topic;
        container = new KafkaContainer(
            DockerImageName.parse(IMAGE).asCompatibleSubstituteFor("confluentinc/cp-kafka")
        )
            .waitingFor(Wait.forListeningPort());
        container.start();
    }

    public KafkaContainerWrapper() {
        this("default-topic");
    }

    @SneakyThrows
    public ExecResult sendMessage(String message) {
        return container.execInContainer(
            "sh", "-c", "echo '%s' | ".formatted(message)
                + " kafka-console-producer"
                + " --topic %s".formatted(topic)
                + " --bootstrap-server %s".formatted(CONTAINER_INTERNAL_BOOTSTRAP)
        );
    }

    @SneakyThrows
    public ExecResult readFirstMessage() {
        return container.execInContainer(
            "kafka-console-consumer",
            "--topic", topic,
            "--offset", "earliest",
            "--partition", "0",
            "--max-messages", "1",
            "--timeout-ms", "1000",
            "--bootstrap-server", CONTAINER_INTERNAL_BOOTSTRAP
        );
    }

    @SneakyThrows
    public ExecResult clearMessages() {
        container.execInContainer(
            "sh", "-c",
            "echo '%s'".formatted(DELETE_MESSAGES.replace("%TOPIC%", topic))
                + " | tee %s".formatted(DELETE_MESSAGES_FILE_PATH)
        );
        return container.execInContainer(
            "kafka-delete-records",
            "--offset-json-file", DELETE_MESSAGES_FILE_PATH,
            "--bootstrap-server", CONTAINER_INTERNAL_BOOTSTRAP
        );
    }

    public void initProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.bootstrap-servers", container::getBootstrapServers);
        registry.add("spring.kafka.consumer.topic.name", this::getTopic);
    }

}