Java實現任務調度FIFO隊列策略,LinkedBlockingDeque使用(附代碼) | 實用代碼架構

文章目錄

  • 前言
  • FIFO任務調度器架構
  • 示例代碼
  • 總結

前言

在工作中,很多高並發的場景中,我們會用到隊列來實現大量的任務請求。當任務需要某些特殊資源的時候,我們還需要合理的分配資源,讓隊列中的任務高效且有序完成任務。熟悉分佈式的話,應該瞭解yarn的任務調度算法。本文主要用java實現一個FIFO(先進先出調度器),這也是常見的一種調度方式。

FIFO任務調度器架構

主要實現的邏輯可以歸納為:

1、任務隊列主要是單隊列,所有任務按照順序進入隊列後,也會按照順序執行。
2、如果任務無法獲得資源,則將任務塞回隊列原位置。

示例代碼

Maven依賴如下:

      <dependency>            <groupId>org.projectlombok</groupId>            <artifactId>lombok</artifactId>            <optional>true</optional>        </dependency>                <dependency>            <groupId>cn.hutool</groupId>            <artifactId>hutool-all</artifactId>            <version>5.5.2</version>        </dependency>

具體的原理就不細說瞭,通過代碼我們看看FIFO任務調度策略是什麼玩的吧。下面的代碼也可以作為參考。我們會使用到一個雙向阻塞隊列LinkedBlockingDeque。後面的代碼說明會提到。

package ai.guiji.csdn.dispatch;import cn.hutool.core.thread.ThreadUtil;import lombok.Builder;import lombok.Data;import lombok.extern.slf4j.Slf4j;import org.springframework.scheduling.concurrent.CustomizableThreadFactory;import java.util.Random;import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicInteger;import java.util.stream.IntStream;/** * @Program: csdn @ClassName: FIFODemo @Author: 劍客阿良_ALiang @Date: 2021-12-24 21:21 @Description: * fifo隊列 @Version: V1.0 */@Slf4jpublic class FIFODemo {  private static final LinkedBlockingDeque<Task> TASK_QUEUE = new LinkedBlockingDeque<>();  private static final ConcurrentHashMap<Integer, LinkedBlockingQueue<Resource>> RESOURCE_MAP =      new ConcurrentHashMap<>();  private static final ExecutorService TASK_POOL =      new ThreadPoolExecutor(          8,          16,          0L,          TimeUnit.MILLISECONDS,          new LinkedBlockingQueue<>(),          new CustomizableThreadFactory("TASK-THREAD-"),          new ThreadPoolExecutor.AbortPolicy());  private static final ScheduledExecutorService ENGINE_POOL =      Executors.newSingleThreadScheduledExecutor(new CustomizableThreadFactory("ENGINE-"));  private static final AtomicInteger CODE_BUILDER = new AtomicInteger(0);  @Data  @Builder  private static class Resource {    private Integer rId;    private Type type;  }  @Data  @Builder  private static class Task implements Runnable {    private Integer tId;    private Runnable work;    private Type type;    private Resource resource;    @Override    public void run() {      log.info("[{}]任務,使用資源編號:[{}]", tId, resource.getRId());      try {        work.run();      } catch (Exception exception) {        exception.printStackTrace();      } finally {        log.info("[{}]任務結束,回歸資源", tId);        returnResource(resource);      }    }  }  private enum Type {    /** 資源類型 */    A("A資源", 1),    B("B資源", 2),    C("C資源", 3);    private final String desc;    private final Integer code;    Type(String desc, Integer code) {      this.desc = desc;      this.code = code;    }    public String getDesc() {      return desc;    }    public Integer getCode() {      return code;    }  }  public static void initResource() {    Random random = new Random();    int aCount = random.nextInt(10) + 1;    int bCount = random.nextInt(10) + 1;    int cCount = random.nextInt(10) + 1;    RESOURCE_MAP.put(Type.A.getCode(), new LinkedBlockingQueue<>());    RESOURCE_MAP.put(Type.B.getCode(), new LinkedBlockingQueue<>());    RESOURCE_MAP.put(Type.C.getCode(), new LinkedBlockingQueue<>());    IntStream.rangeClosed(1, aCount)        .forEach(            a ->                RESOURCE_MAP                    .get(Type.A.getCode())                    .add(Resource.builder().rId(a).type(Type.A).build()));    IntStream.rangeClosed(1, bCount)        .forEach(            a ->                RESOURCE_MAP                    .get(Type.B.getCode())                    .add(Resource.builder().rId(a).type(Type.B).build()));    IntStream.rangeClosed(1, cCount)        .forEach(            a ->                RESOURCE_MAP                    .get(Type.C.getCode())                    .add(Resource.builder().rId(a).type(Type.C).build()));    log.info("初始化資源A數量:{},資源B數量:{},資源C數量:{}", aCount, bCount, cCount);  }  public static Resource extractResource(Type type) {    return RESOURCE_MAP.get(type.getCode()).poll();  }  public static void returnResource(Resource resource) {    log.info("開始歸還資源,rId:{},資源類型:{}", resource.getRId(), resource.getType().getDesc());    RESOURCE_MAP.get(resource.getType().code).add(resource);    log.info("歸還資源完成,rId:{},資源類型:{}", resource.getRId(), resource.getType().getDesc());  }  public static void enginDo() {    ENGINE_POOL.scheduleAtFixedRate(        () -> {          Task task = TASK_QUEUE.poll();          if (task == null) {            log.info("任務隊列為空,無需要執行的任務");          } else {            Resource resource = extractResource(task.getType());            if (resource == null) {              log.info("[{}]任務無法獲取[{}],返回隊列", task.getTId(), task.getType().getDesc());              TASK_QUEUE.addFirst(task);            } else {              task.setResource(resource);              TASK_POOL.submit(task);            }          }        },        0,        1,        TimeUnit.SECONDS);  }  public static void addTask(Runnable runnable, Type type) {    Integer tId = CODE_BUILDER.incrementAndGet();    Task task = Task.builder().tId(tId).type(type).work(runnable).build();    log.info("提交任務[{}]到任務隊列", tId);    TASK_QUEUE.add(task);  }  public static void main(String[] args) {    initResource();    enginDo();    Random random = new Random();    ThreadUtil.sleep(5000);    IntStream.range(0, 10)        .forEach(            a -> addTask(() -> ThreadUtil.sleep(random.nextInt(10) + 1, TimeUnit.SECONDS), Type.A));    IntStream.range(0, 10)        .forEach(            a -> addTask(() -> ThreadUtil.sleep(random.nextInt(10) + 1, TimeUnit.SECONDS), Type.B));    IntStream.range(0, 10)        .forEach(            a -> addTask(() -> ThreadUtil.sleep(random.nextInt(10) + 1, TimeUnit.SECONDS), Type.C));  }}

代碼說明:

1、首先我們構造瞭任務隊列,使用的是LinkedBlockingDeque,使用雙向隊列的原因是如果任務無法獲取資源,還需要塞到隊首,保證任務的有序性。
2、使用ConcurrentHashMap作為資源映射表,為瞭保證資源隊列使用的均衡性,一旦使用完成的資源會塞到對應資源的隊尾處。
3、其中實現瞭添加任務、提取資源、回歸資源幾個方法。
4、initResource方法可以初始化資源隊列,這裡面隻是簡單的隨機瞭幾個資源到A、B、C三種資源,塞入各類別隊列。
5、任務私有類有自己的任務標識以及執行完後調用回歸資源方法。
6、main方法中會分別提交需要3中資源的10個任務,看看調度情況。

執行結果



我們可以通過結果發現任務有序調度,使用完任務後回歸隊列。

總結

在工作中如果有用到的話,可以參考參考,本文主要是分享。
分享:
 when you really think about it? one of the hardest lessons in life is letting go. whether it`s guilt anger love or loss. Change is never easy .We fight to hold on, and we fight to let go. ——我也不知道來自哪,偶然看到的一句話。
如果本文對你有幫助的話,點個贊吧,謝謝!