我需要一个java代码,每3秒从数据库的aws_questions表里捞取enbale为0的最新5条数据,放到java程序的一个任务队列里。同时定义一个5个大小的线程池去处理任务队列里的数据,线程处理完就会去任务队列里捞取新的任务。要注意不要并发处理。
好的,以下是一个示例Java代码,可以完成你的要求:
import java.sql.*; import java.util.*; import java.util.concurrent.*; public class DatabasePollingExample { private static final String DB_URL = "jdbc:mysql://your_database_url_here"; private static final String DB_USER = "your_database_username_here"; private static final String DB_PASSWORD = "your_database_password_here"; private static final int MAX_THREADS = 5; private static final int QUEUE_CAPACITY = 5; public static void main(String[] args) { BlockingQueue<String> taskQueue = new ArrayBlockingQueue<>(QUEUE_CAPACITY); ExecutorService threadPool = Executors.newFixedThreadPool(MAX_THREADS); Timer timer = new Timer(); timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { // Connect to database Connection conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD); // Query for the latest 5 rows with enable=0 String sql = "SELECT * FROM aws_questions WHERE enable = 0 ORDER BY id DESC LIMIT 5"; Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(sql); // Add tasks to the queue while (rs.next()) { String question = rs.getString("question"); taskQueue.put(question); } // Close database connections rs.close(); stmt.close(); conn.close(); } catch (Exception e) { e.printStackTrace(); } } }, 0, 3000); while (true) { try { // Take a task from the queue String question = taskQueue.take(); // Submit task to thread pool threadPool.submit(() -> { // Process the task here // ... // Sleep for 1 second to simulate work try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); } catch (InterruptedException e) { e.printStackTrace(); } } } }
说明:
DB_URL
, DB_USER
, 和 DB_PASSWORD
分别是你的数据库的连接信息,需要替换成你自己的。
MAX_THREADS
和 QUEUE_CAPACITY
分别是线程池大小和任务队列大小,可以根据你的需求进行调整。
Timer
用于定时执行数据库查询任务,每3秒执行一次。
查询结果按照id降序排列,最多返回5条记录。
使用 BlockingQueue
存储任务队列,如果队列满了,put()
方法会阻塞,直到队列有空位。
使用 ExecutorService
创建线程池,提交任务时会自动分配线程来执行。
在线程执行任务时加入 Thread.sleep(1000)
,以便模拟实际的处理工作,如果没有这个 sleep()
,线程处理完任务后会立即去任务队列里捞取新的任务,导致并发处理。
代码中的异常处理部分可以根据需要进行修改。