Physical Address

304 North Cardinal St.
Dorchester Center, MA 02124

Zarządzanie połączeniami bazodanowymi w środowisku wielowątkowym: Implementacja wzorca ObjectPool z wykorzystaniem JDBC i testowanie z JCStress

Hibernate czy inne ORM’y są bardzo wygodne w użyciu, ale czasami może zajść konieczność posiadania większej kontroli i wglądu do tego co się dzieje “pod spodem”.  Jeśli używamy Spring Boota to dołączając do naszego projektu zależność spring data jpa, właśnie używamy Hibernate’a. Niżej mamy JdbcTemplate zawarty w module spring data jdbc, ale nawet on parę rzeczy upraszcza, np zarządzanie połączeniami, które oddelegowuje do różnych implementacji DataSource jak HikariCP. W tym artykule zajmiemy się surowym JDBC i stworzymy własną klasę do zarządzania połączeniami, a następnie przetestujemy jej działanie w środowisku wielowątkowym z użyciem jcstress

Zacznijmy od krótkiego wyjaśnienia czym jest wzorzec ObjectPool, bo właśnie jego wariancje będziemy implementować.

ObjectPool jest to wzorzec z kategorii kreacyjnych. Stosuje się go najczęściej w przypadku zarządzania obiektami, które są kosztowne w tworzeniu albo ich dostępność jest ograniczona

Z punktu widzenia wzorca najważniejsze są metody take() release(), ponieważ pozwalają na pobranie i oddanie obiektu do puli, przez możemy wielokrotnie używać raz stworzonych obiektów i jest to największa zaleta tego wzorca.

Zajmijmy się więc implementajcą. Z racji, że mamy sporo argumentów do przekazania, użyje tutaj jeszcze wzorca Builder, ale jego omówieniem, zajmę się w innym artykule.

Oprócz podstawowej implementacji, dodałem tutaj również mechanizm sprawdzania czy dany obiekt Connection jest poprawny i został stworzony przez nas. Zawsze się może trafić jakiś śmieszek co wrzuci do puli połączenie do czegoś innego ;). Oprócz tego MAX_DB_PING_TIMEOUT określa, jak długo będziemy czekać na ewentualnego pinga z bazy danych, podczas sprawdzania czy dane połączenie działa. Zamiast zwracać nulla w przypadku pustej puli zwracam tutaj Optionala w metodzie take. Chciałem zasygnalizować tym samym, że klient musi obsłużyć sytuację, że pula może być po prostu pusta,

public class BadConnectionPool implements ObjectPool<Connection> {
private static final int MAX_DB_PING_TIMEOUT = 2;
private final Queue<Connection> pool;
private final Set<Connection> usedConnections;

private BadConnectionPool(QuizDataSourceBuilder builder) {

this.pool = new LinkedList<>();
this.usedConnections = new HashSet<>();
try {
for (int i = 0; i < builder.poolSize; i++) {
Connection connection = DriverManager.getConnection(
builder.url, builder.user, builder.password
);
pool.add(connection);
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

public Optional<Connection> take() {
Connection connection = pool.poll();

if (!isConnectionValid(connection)) {
return Optional.empty();
}

usedConnections.add(connection);
return Optional.of(connection);
}

public void release(Connection connection) {
if (isConnectionValid(connection) && usedConnections.remove(connection)) {
pool.offer(connection);
}
}

private boolean isConnectionValid(Connection connection) {
if (connection == null) {
return false;
}
try {
return !connection.isClosed() && connection.isValid(MAX_DB_PING_TIMEOUT);
} catch (SQLException e) {
return false;
}
}

@Override
public int getSize() {
return pool.size();
}

public static class QuizDataSourceBuilder { . . . }
}

Wszystko fajnie, tylko nie bez powodu ta klasa się nazywa BadConnectionPool, ale nie uprzedzajmy faktów. Teraz przejdźmy do jcstress. Aby użyć tego modułu w projekcie musimy dodać zależność do  pom.xml

<dependency>
<groupId>org.openjdk.jcstress</groupId>
<artifactId>jcstress-core</artifactId>
<version>0.16</version>
<scope>test</scope>
</dependency>

Oprócz tego trzeba zainstalować plugin JCSTRESS.

Link do githuba pluginu 

 

Struktura stress testu wygląda następująco:

 

@JCStressTest – adnotacja oznaczająca klase jako test jcstress

@Outcome(String[] id, Expect expect, String description) – adnotacja określająca wynik testu, możemy określić jakie warunki ma spełniać oczekiwany wynik, a jakie błędny.

@State – określa stan współdzielony między wątkami

@Actor – tą adnotacją oznaczamy metodę, która będzie wykonywana równolegle w wielu wątkach

@Arbiter – oznaczamy nią metodę, która się wykona raz na koniec testu. Służy do zebrania wyników. Jako parametr przyjmuje ona klasy takie jak:

  • I_Result – wynik jednoelementowy
  • II_Result – wynik dwuelementowy

itd.

 

@JCStressTest
@Outcome(id = "99, 0", expect = ACCEPTABLE,
desc = "All connections were returned and corrupted"
+ " connection was rejected by pool")
@Outcome(id = "99, 1", expect = FORBIDDEN,
desc = "One connection was closed and pool did not take it back,"
+ " registered one corrupted connection")
@Outcome(id = "100, 0", expect = FORBIDDEN,
desc = "One connection was closed and successfully returned to pool")
@Outcome(id = "100, 1", expect = FORBIDDEN,
desc = "One connection was closed and successfully returned"
+ " to pool plus registered one corrupted connection")
@State
public class ConnectionPoolTest {
private final ObjectPool<Connection> pool;
private final AtomicInteger incorrectConnectionCount = new AtomicInteger(0);
private final AtomicInteger activeConnectionsMax = new AtomicInteger(0);

public ConnectionPoolTest() {
pool = GoodConnectionPool.QuizDataSourceBuilder.builder()
.url("jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1")
.user("sa")
.password("")
.poolSize(100)
.driver("org.h2.Driver")
.build();
}

@Actor
public void actor1() {
Optional<Connection> take = pool.take();
if (take.isPresent()) {
Connection connection = take.get();
try {
connection.getSchema();
connection.close();
} catch (SQLException e) {
incorrectConnectionCount.incrementAndGet();
} finally {
pool.release(connection);
activeConnectionsMax.decrementAndGet();
}
} else {
incorrectConnectionCount.incrementAndGet();
}
}

@Actor
public void actor2() {
for (int i = 0; i < 10; i++) {
Optional<Connection> connectionOpt = pool.take();
if (connectionOpt.isPresent()) {
Connection connection = connectionOpt.get();
pool.release(connection);
} else {
incorrectConnectionCount.incrementAndGet();
}
}
}

@Arbiter
public void arbiter(II_Result r) {
r.r1 = pool.getSize();
r.r2 = incorrectConnectionCount.get();
}
}

Zadaniem @Actor nr 1 jest pobranie połączenia, następnie wykonuje akcje na połączeniu, wszystko w celu zasymulowania pracy z obiektem Connection i zwiększenia szansy na race condition. Na koniec zamykam połączenie (co jest niepoprawne, ponieważ to właśnie domyślnie ObjectPool powinien się tym ewentualnie zajmować) zwalniam połączenie. W między czasie rejestruje ilość nieprawidłowych prób pobrania połączenia.

@Actor 2 Pobiera i zamyka połączenie 10 razy.

A tak się prezentują wyniki, jak widzimy w większości przypadków wszystko poszło w porządku, lecz w ponad 3% przypadków nastąpiły różnego rodzaju anomalie, które są niedopuszczalne w kodzie produkcyjnym. 

Spróbujmy wykorzystać tą pulę w praktyce.  Zadeklarujmy 100 wątków które pobiorą ten sam rekord z bazy danych i inkrementują wartość jego 1. Zarówno SELECT FOR UPDATE jak i UPDATE zostaną wykonane w transakcji. 

SELECT FOR UPDATE to instrukcja SQL służąca do blokowania wybranych wierszy w bazie danych podczas transakcji. Jest to kluczowy mechanizm zapewniający spójność danych w środowisku współbieżnym. 

Jak widzimy mamy całkowity rozjazd, co zaskakujące w jednym przypadku udało się zrealizować wszystkie aktualizacje poprawnie a winnym przeszło tylko 6.

Teraz czas naprawić tą pule, mamy klika opcji takich m.in. ReetrantLock, blok synchronized, ReadWriteLock, kolekcje z java.util.concurrent…

Każdy obiekt w Javie ma związany z nim monitor, który działa jako strażnik kontrolujący dostęp do kodu synchronizowanego na tym obiekcie. Jeśli byśmy chcieli użyć bloku synchronized to może to wyglądać w następujący sposób:

public void release(Connection connection) {
if (isConnectionValid(connection) &&
usedConnections.remove(connection)) {
synchronized (lock) {
pool.offer(connection);
}
semaphore.release();
}
}

W naszym przypadku jednak użyje kolekcji z java.util.concurrent, ponieważ poprzez to zwiększy się czytelność kodu. Bloki synchronized wydają mi się mieć sens w bardziej specjalistycznych przypadkach, gdy konkretny fragment kodu wymaga synchronizacji, a my chcemy zapewnić atomowość operacji na tylko na kolekcji. Użyjemy więc następujących kolekcji.

this.pool = new LinkedBlockingQueue<>();
this.usedConnections = ConcurrentHashMap.newKeySet();

Sama zmiana kolekcje na wersje bezpieczne wielowątkowo, pozwiła testowi przejść. Pierwsza wartość to liczba połączeń w puli, druga to ile zostało wykrytych nieprawidłowo pobranych połączeń.

Niżej wklejam kod którego używałem do testowania. Można, zauważyć, że używam tutaj innej implementacji ObjectPool, którą pozyskuje z fabryki pul wątków Executors. Jest to pula wątków o stałym rozmiarze.

Ciekawą sytuacją, którą napotkałem i nie wiedziałem z czego wynika było to, że pomimo stress testy przechodziły pomyślnie, wciąż widziałem błędny wynik w bazie danych, pomimo, że oba zapytania wykonywane są w transakcji. Oprócz niej musiałem użyć SELECT FOR UPDATE, który zakłada locka na dany rekord w bazie danych do końca trwania transakcji.

public class App {
public static void main(String[] args) throws SQLException {
ObjectPool<Connection> pool = BadConnectionPool.QuizDataSourceBuilder.builder()
.url("jdbc:mysql://localhost:3306/quizdb")
.user("root")
.password("root")
.driver("com.mysql.cj.jdbc.Driver")
.poolSize(100)
.build();

int testValue = 0;
Optional<Connection> initialConnectionOpt = pool.take();
if (initialConnectionOpt.isPresent()) {
Connection connection = initialConnectionOpt.get();
try {
Statement statement = connection.createStatement();
statement.execute("DELETE FROM test");
String sql = String.format("INSERT INTO test (value) VALUES (%d);", testValue);
statement.execute(sql);
statement.close();
} finally {
pool.release(connection);
}
}

Mapper<Test> mapper = new Mapper<>(Test.class);
ExecutorService executorService = Executors.newFixedThreadPool(100);

for (int i = 0; i < 100; i++) {
executorService.submit(() -> {
Optional<Connection> optionalConnection = pool.take();
if (optionalConnection.isPresent()) {
Connection connection = optionalConnection.get();
try {
connection.setAutoCommit(false);
Statement stat = connection.createStatement();
ResultSet resultSet = stat.executeQuery("SELECT * FROM test FOR UPDATE");
List<Test> map = mapper.map(resultSet);
Test test = map.get(0);
String updateQuery = String.format(
"UPDATE test SET value=%d WHERE id=%d;", test.value() + 1, test.id());
stat.executeUpdate(updateQuery);

connection.commit();
resultSet.close();
stat.close();
connection.setAutoCommit(true);

} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
pool.release(connection);
}
}
});
}
executorService.shutdown();
}
}

Podsumowanie

W tym artykule omówiłem jak zacząć przygodę z  testowaniem kodu wielowątkowego z użyciem jcstress. Jako obiekt testowy posłużyla mi własna implementacja wzorca Object Pool dla połączeń bazodanowych. Przedstawiłem skutki używania kodu, który nie jest bezpieczny wielowątkowo i pokazałem jak można sprawić by był thread-safe

Link do kodu źródłowego z artykułu:

https://github.com/newmon13/jcstress-testing-example

Leave a Reply

Your email address will not be published. Required fields are marked *