In order to optimize our mq’s consumer, we import the connection pool and concurrency into our consumer. I’ll describe some problems we met during the development.
Please download the commons.dbcp,pool and connection jars from http://jakata.apache.org first of all.
The consumer.java is used to create connection pools and registed them to the DriverManager.
The DBO.java is a brief capsulated DB operator.
consumer.java
————————————
public static void main(String[] args){
Class.forName(“com.mysql.jdbc.Driver”);
smarterConsumer.setupDriver(“T1″);
smarterConsumer.setupDriver(“T2″);
try{
DBO dboT1 = new DBO(“T1″);
DBO dboT2 = new DBO(“T2″);
dboT1.execUpdate(“create table Test1(ID int(4))”);
dboT2.execUpdate(“create table Test2(ID int(5))”;
}catch(Exception e){
System.out.println(e.getMessage());
}
}
public void setupDriver(String tag) throws Exception {
String connectURI = “jdbc:mysql://localhost/”;
if(tag.equalsIgnoreCase(“etl”)){
connectURI = connectURI + “db1?user=user&password=***”;
}else{
connectURI = connectURI + “db2?user=user&password=***”;
}
ObjectPool connectionPool = new GenericObjectPool(null);
ConnectionFactory connectionFactory = new DriverManagerConnectionFactory(connectURI,null);
PoolableConnectionFactory poolableConnectionFactory = new PoolableConnectionFactory(connectionFactory,connectionPool,null,null,false,true);
Class.forName(“org.apache.commons.dbcp.PoolingDriver”);
PoolingDriver driver = (PoolingDriver) DriverManager.getDriver(“jdbc:apache:commons:dbcp:”);
driver.registerPool(“DS”+tag,connectionPool);
}
DBO.java
————————————
public DBO(String tag) {
try{
conn = DriverManager.getConnection(“jdbc:apache:commons:dbcp:DS”+tag);
}catch(Exception namingE){
System.out.println(namingE.getMessage());
}
}
public int execUpdate(String sql) throws SQLException {
int rs = 0;
Statement stmt = conn.createStatement();
try {
rs = stmt.executeUpdate(sql);
} catch (SQLException sqle) {
SQLErr:” + sqle.getMessage());
}finally{
stmt.close();
}
return rs;
}
Please pay attention the connections return from the DBO. Do not declare the connections static, otherwise they will be confused.
When you use the connection, remember to close the connection when you abande the instance.
The connection we used in the DBCP is a PoolableConnection, the close() method is to return the connection to the pool. The reallyClose() will release the connection.
I’d like to explain this sentence to you:
conn = DriverManager.getConnection(“jdbc:apache:commons:dbcp:DS”+tag);
The DriverManager is used to distribute the request to connectionFactories by sending the URI parameter to connectionFactory. If the connectionFactory can handle the URI, a ‘true’ will return.
The jdbc:apache:commons:dbcp:* will be handled by the DBCP, while the jdbc:mysql:* will be handled directly by mysql-connector-java-3.1.4-bin.jar
You may take the docs as a reference:
http://jakarta.apache.org/commons/dbcp/apidocs/index.html
http://jakarta.apache.org/commons/pool/apidocs/index.html