Обёртка для testcontainers Kafka. Умеет читать из контейнера, писать в контейнер и очищать топик.
import lombok.Getter;
import lombok.SneakyThrows;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.testcontainers.containers.Container.ExecResult;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;
public class KafkaContainerWrapper {
private static final String IMAGE = "confluentinc/cp-kafka";
private static final String DELETE_MESSAGES = "{\"partitions\": [{\"topic\": \"%TOPIC%\", \"partition\": 0, \"offset\": -1}], \"version\": 1}";
private static final String DELETE_MESSAGES_FILE_PATH = "./delete-messages.json";
private static final String CONTAINER_INTERNAL_BOOTSTRAP = "localhost:9092";
@Getter
private final String topic;
@Getter
private final KafkaContainer container;
public KafkaContainerWrapper(String topic) {
this.topic = topic;
container = new KafkaContainer(
DockerImageName.parse(IMAGE).asCompatibleSubstituteFor("confluentinc/cp-kafka")
)
.waitingFor(Wait.forListeningPort());
container.start();
}
public KafkaContainerWrapper() {
this("default-topic");
}
@SneakyThrows
public ExecResult sendMessage(String message) {
return container.execInContainer(
"sh", "-c", "echo '%s' | ".formatted(message)
+ " kafka-console-producer"
+ " --topic %s".formatted(topic)
+ " --bootstrap-server %s".formatted(CONTAINER_INTERNAL_BOOTSTRAP)
);
}
@SneakyThrows
public ExecResult readFirstMessage() {
return container.execInContainer(
"kafka-console-consumer",
"--topic", topic,
"--offset", "earliest",
"--partition", "0",
"--max-messages", "1",
"--timeout-ms", "1000",
"--bootstrap-server", CONTAINER_INTERNAL_BOOTSTRAP
);
}
@SneakyThrows
public ExecResult clearMessages() {
container.execInContainer(
"sh", "-c",
"echo '%s'".formatted(DELETE_MESSAGES.replace("%TOPIC%", topic))
+ " | tee %s".formatted(DELETE_MESSAGES_FILE_PATH)
);
return container.execInContainer(
"kafka-delete-records",
"--offset-json-file", DELETE_MESSAGES_FILE_PATH,
"--bootstrap-server", CONTAINER_INTERNAL_BOOTSTRAP
);
}
public void initProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", container::getBootstrapServers);
registry.add("spring.kafka.consumer.topic.name", this::getTopic);
}
}