Java的CompletableFuture,Java的多线程开发
来源:博客园     时间:2023-05-25 14:50:31
三、Java8的CompletableFuture,Java的多线程开发1、CompletableFuture的常用方法以后用到再加
runAsync() :开启异步(创建线程执行任务),无返回值supplyAsync() :开启异步(创建线程执行任务),有返回值thenApply() :然后应用,适用于有返回值的结果,拿着返回值再去处理。exceptionally():用于处理异步任务执行过程中出现异常的情况的一个方法:返回默认值或者一个替代的 CompletableFuture 对象,从而避免系统的崩溃或异常处理的问题。handle():类似exceptionally()get()  :阻塞线程:主要可以: ①获取线程中的异常然后处理异常、②设置等待时间join() :阻塞线程:推荐使用  join()  方法,因为它没有受到 interrupt 的干扰,不需要捕获异常,也不需要强制类型转换。他自己会抛出异常。CompletableFuture.allOf()CompletableFuture.anyOf()
get() 和 join()方法区别?都可以阻塞线程 —— 等所有任务都执行完了再执行后续代码。
CompletableFuture 中的  get()  和  join()  方法都用于获取异步任务的执行结果,但是在使用时需要注意以下几点区别:  1. 抛出异常的方式不同:如果异步任务执行过程中出现异常, get()  方法会抛出 ExecutionException 异常,而  join()  方法会抛出 CompletionException 异常,这两个异常都是继承自 RuntimeException 的。  2. 方法调用限制不同: join()  方法是不可以被中断的,一旦调用就必须等待任务执行完成才能返回结果;而  get()  方法可以在调用时设置等待的超时时间,如果超时还没有获取到结果,就会抛出 TimeoutException 异常。  3. 返回结果类型不同: get()  方法返回的是异步任务的执行结果,该结果是泛型类型 T 的,需要强制转换才能获取真正的结果;而  join()  方法返回的是异步任务的执行结果,该结果是泛型类型 T,不需要强制转换。  4. 推荐使用方式不同:推荐在 CompletableFuture 中使用  join()  方法,因为它没有受到 interrupt 的干扰,不需要捕获异常,也不需要强制类型转换。  综上所述, get()  方法和  join()  方法都是获取异步任务的执行结果,但是在使用时需要根据具体场景选择使用哪个方法。如果需要获取执行结果并且不希望被中断,推荐使用  join()  方法;如果需要控制等待时间或者需要捕获异常,则可以使用  get()  方法。
anyOf() 和 allOf()的区别?
CompletableFuture 是 Java 8 引入的一个强大的异步编程工具,它支持链式调用、组合和转换异步操作等功能。其中,anyOf 和 allOf 都是 CompletableFuture 的两个常用方法,它们的区别如下:  1. anyOf:任意一个 CompletableFuture 完成,它就会跟随这个 CompletableFuture 的结果完成,返回第一个完成的 CompletableFuture 的结果。  2. allOf:所有的 CompletableFuture 都完成时,它才会跟随它们的结果完成,返回一个空的 CompletableFuture。  简而言之,anyOf 和 allOf 的最大区别是:anyOf 任意一个 CompletableFuture 完成就跟着它的结果完成,而 allOf 所有的 CompletableFuture 完成才可以完成,并返回一个空的 CompletableFuture。  举例来说,如果有三个 CompletableFuture:f1、f2、f3,其中 f1 和 f2 可能会返回一个字符串,而 f3 可能会返回一个整数,那么:  - anyOf(f1, f2, f3) 的结果是 f1、f2、f3 中任意一个 CompletableFuture 的结果; - allOf(f1, f2, f3) 的结果是一个空的 CompletableFuture,它的完成状态表示 f1、f2、f3 是否全部完成。  总之,anyOf 和 allOf 在实际使用中可以根据不同的需求来选择,它们都是 CompletableFuture 中非常强大的组合操作。
2、使用CompletableFuture2.1、实体类准备
package com.cc.md.entity;import lombok.Data;/** * @author CC * @since 2023/5/24 0024 */@Datapublic class UserCs {    private String name;    private Integer age;}
2.2、常用方式无返回值推荐:开启多线程——无返回值的——阻塞:test06
@Resource(name = "myIoThreadPool")    private ThreadPoolTaskExecutor myIoThreadPool;        //CompletableFuture开启多线程——无返回值的    @Test    public void test06() throws Exception {        List> futures = new ArrayList<>();        //循环,模仿很多任务        for (int i = 0; i < 1000; i++) {            int finalI = i;            CompletableFuture future = CompletableFuture.runAsync(() -> {                //第一批创建的线程数                log.info("打印:{}", finalI);                //模仿io流耗时                try {                    Thread.sleep(5000);                } catch (InterruptedException e) {                    throw new RuntimeException(e);                }            }, myIoThreadPool);            futures.add(future);        }        //阻塞:多线程的任务执行。相当于多线程执行完了,再执行后面的代码        //如果不阻塞,上面的相当于异步执行了。        //阻塞方式1:可以获取返回的异常、设置等待时间//        futures.forEach(future -> {//            try {//                future.get();//            } catch (Exception e) {//                throw new RuntimeException(e);//            }//        });        //阻塞方式2(推荐)        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();        log.info("打印:都执行完了。。。");    }
有返回值推荐:开启多线程——有返回值的,返回一个新的List——阻塞——使用stream流的map:test09test07、test08 可以转化为 test09(现在这个)可以返回任务类型的值,不一定要返回下面的user对象。
@Resource(name = "myIoThreadPool")    private ThreadPoolTaskExecutor myIoThreadPool;        //CompletableFuture开启多线程——有返回值的,返回一个新的List——先有数据的情况——使用stream流的map    //像这种,需要构建另一个数组的,相当于一个线程执行完了,会有返回值    //使用stream流的map + CompletableFuture.supplyAsync()    @Test    public void test09() throws Exception {        //先获取数据,需要处理的任务。        List users = this.getUserCs();        //莫法处理任务        List> futures = users.stream()                .map(user -> CompletableFuture.supplyAsync(() -> {                    // 处理数据                    user.setName(user.getName() + "-改");                    log.info("打印-改:{}", user.getName());                    // 其他的业务逻辑。。。                    return user;                }, myIoThreadPool)).collect(Collectors.toList());        //获取futures        List endList = futures.stream()                //阻塞所有线程                .map(CompletableFuture::join)                //取age大于10的用户                .filter(user -> user.getAge() > 10)                //按照age升序排序                .sorted(Comparator.comparing(UserCs::getAge))                .collect(Collectors.toList());        log.info("打印:都执行完了。。。{}", endList);    }
2.3、异常处理exceptionallyhandle
//CompletableFuture 异常处理    @Test    public void test10() throws Exception {        //先获取数据,需要处理的任务。        List users = this.getUserCs();        //莫法处理任务        List> futures = users.stream()                .map(user -> CompletableFuture.supplyAsync(() -> {                        if (user.getAge() > 5){                            int a = 1/0;                        }                        // 处理数据                        user.setName(user.getName() + "-改");                        log.info("打印-改:{}", user.getName());                        // 其他的业务逻辑。。。                        return user;                    }, myIoThreadPool)                    //处理异常方式1:返回默认值或者一个替代的 Future 对象,从而避免系统的崩溃或异常处理的问题。                    .exceptionally(throwable -> {                        //可以直接获取user                        System.out.println("异常了:" + user);                        //处理异常的方法……                        //1还可以进行业务处理……比如将异常数据存起来,然后导出……                        //2返回默认值,如:user、null                        //return user;                        //3抛出异常                        throw new RuntimeException(throwable.getMessage());                    })                    //处理异常方式2:类似exceptionally(不推荐)//                    .handle((userCs, throwable) -> {//                        System.out.println("handle:" + user);//                        if (throwable != null) {//                            // 处理异常//                            log.error("处理用户信息出现异常,用户名为:" + user.getName(), throwable);//                            // 返回原始数据//                            return userCs;//                        } else {//                            // 返回正常数据//                            return userCs;//                        }//                    })                )                .collect(Collectors.toList());        //获取futures        List endList = futures.stream()                //阻塞所有线程                .map(CompletableFuture::join)                //取age大于10的用户                .filter(user -> user.getAge() > 10)                //按照age升序排序                .sorted(Comparator.comparing(UserCs::getAge))                .collect(Collectors.toList());        log.info("打印:都执行完了。。。{}", endList);    }
2.4、CompletableFuture的使用测试

1、推荐使用:test03、test05、test09、test10、test11


(资料图片)

2、test07、test08就是test09的前身。

test01:获取当前电脑(服务器)的cpu核数

test02:线程池原始的使用(不推荐直接这样用)

test03:开启异步1 —— @Async

test04:开启异步2 —— CompletableFuture.runAsync()

test05:开启异步2的改造 —— CompletableFuture.runAsync() 和 supplyAsync() —— 阻塞所有异步方法,一起提交

相当于开了3个线程去执行三个不同的方法,然后执行完后一起提交。

test052:开启异步2的改造 —— 第一个任务执行完了,获取到返回值,给后面的执行,可以连写,也可以单写。 —— 阻塞线程:get、join

test06:CompletableFuture开启多线程——无返回值的

test07:CompletableFuture开启多线程——无返回值的——构建一个新List

1、相当于多线程执行任务,然后把结果插入到List中2、接收多线程的List必须是线程安全的,ArrayList线程不安全   线程安全的List —— CopyOnWriteArrayList 替代 ArrayList

test08:CompletableFuture开启多线程——无返回值的——构建一个新List——先有数据的情况(基本和test07是一个方法)

test09:CompletableFuture开启多线程——有返回值的,返回一个新的List——先有数据的情况——使用stream流的map

test10:CompletableFuture 异常处理。相当于是 test09的增强,处理异常

test11:CompletableFuture 异常处理:如果出现异常就舍弃任务

1、想了一下,出现异常后的任务确实没有执行下去了,任务不往下执行,怎么会发现异常呢?2、发现了异常任务也就完了。而且打印了异常,相当于返回了异常。3、未发生异常的任务会执行完成。如果发生异常都返回空,最后舍弃空的,就得到任务执行成功的 CompletableFuture

↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓所有方式↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓

package com.cc.md;import com.cc.md.entity.UserCs;import com.cc.md.service.IAsyncService;import org.junit.jupiter.api.Test;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import javax.annotation.Resource;import java.util.*;import java.util.concurrent.CompletableFuture;import java.util.concurrent.CopyOnWriteArrayList;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.TimeUnit;import java.util.stream.Collectors;@SpringBootTestclass Test01 {    private static final Logger log = LoggerFactory.getLogger(Test01.class);    @Resource(name = "myIoThreadPool")    private ThreadPoolTaskExecutor myIoThreadPool;    /**     * 异步类     */    @Resource    private IAsyncService asyncService;    @Test    void test01() {        //获取当前jdk能调用的CPU个数(当前服务器的处理器个数)        int i = Runtime.getRuntime().availableProcessors();        System.out.println(i);    }    //线程池原始的使用    @Test    void test02() {        try {            for (int i = 0; i < 1000; i++) {                int finalI = i;                myIoThreadPool.submit(() -> {                    //第一批创建的线程数                    log.info("打印:{}", finalI);                    //模仿io流耗时                    try {                        Thread.sleep(5000);                    } catch (InterruptedException e) {                        throw new RuntimeException(e);                    }                });            }        }catch(Exception e){            throw new RuntimeException(e);        }finally {            myIoThreadPool.shutdown();        }    }    //开启异步1 —— @Async    @Test    public void test03() throws Exception {        log.info("打印:{}", "异步测试的-主方法1");        asyncService.async1();        asyncService.async2();        //不会等待异步方法执行,直接返回前端数据        log.info("打印:{}", "异步测试的-主方法2");    }    //开启异步2 —— CompletableFuture.runAsync()    @Test    public void test04() throws Exception {        log.info("打印:{}", "异步测试的-主方法1");        CompletableFuture.runAsync(() -> {            log.info("打印:{}", "异步方法1!");            //异步执行的代码,也可以是方法,该方法不用单独写到其他类中。            this.async2("异步方法1!-end");        }, myIoThreadPool);        //不会等待异步方法执行,直接返回前端数据        log.info("打印:{}", "异步测试的-主方法2");    }    //异步需要执行的方法,可以写在同一个类中。    private void async2(String msg) {        //模仿io流耗时        try {            Thread.sleep(5000);        } catch (InterruptedException e) {            throw new RuntimeException(e);        }        log.info("打印:{}", msg);    }    //开启异步2的改造 —— CompletableFuture.runAsync() 和 supplyAsync()  —— 阻塞所有异步方法,一起提交    //相当于开了3个线程去执行三个不同的方法,然后执行完后一起提交。    @Test    public void test05() throws Exception {        log.info("打印:{}", "异步测试的-主方法1");        //异步执行1        CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {            log.info("打印:{}", "异步方法1!");            //异步执行的代码,也可以是方法,该方法不用单独写到其他类中。            this.async2("异步方法1-end");            return "异步方法1";        }, myIoThreadPool);        //异步执行2        CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {            log.info("打印:{}", "异步方法2!");            //异步执行的代码,也可以是方法,该方法不用单独写到其他类中。            this.async2("异步方法2-end");            return "异步方法2";        }, myIoThreadPool);        //异步执行3,不用我们自己的线程池 —— 用的就是系统自带的 ForkJoinPool 线程池        CompletableFuture future3 = CompletableFuture.runAsync(() -> {            log.info("打印:{}", "异步方法3!");            //异步执行的代码,也可以是方法,该方法不用单独写到其他类中。            this.async2("异步方法3-end");        });        //阻塞所有异步方法,一起提交后才走下面的代码        CompletableFuture.allOf(future1, future2, future3).join();        log.info("打印:{}", "异步-阻塞-测试的-主方法2-end");    }    //开启异步2的改造 —— 第一个任务执行完了,获取到返回值,给后面的执行,可以连写,也可以单写。 —— 阻塞线程:get、join    // CompletableFuture 的 get 和 join 方法区别:    // get:①可以获取线程中的异常、②设置等待时间    // join:推荐在 CompletableFuture 中使用  join()  方法,因为它没有受到 interrupt 的干扰,不需要捕获异常,也不需要强制类型转换。    @Test    public void test052() throws Exception {        log.info("打印:{}", "异步测试的-主方法1");        //异步执行1        CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {            log.info("打印:{}", "异步方法1!");            // 异步执行的代码,也可以是方法,该方法不用单独写到其他类中。            String str = "异步方法1-end";            this.async2(str);            return str;        }, myIoThreadPool);        // 异步执行2 - 无返回值 —— 分开写的方式        CompletableFuture future2 = future1.thenAccept(str1 -> {            log.info("打印:{}", "异步方法2!");            // 异步执行的代码,也可以是方法,该方法不用单独写到其他类中。            this.async2(String.format("%s-加-异步方法2! - 无返回值 - ",str1));        });        // 异步执行3 - 有返回值 —— 分开写future1,连写future3方式        CompletableFuture future3 = future1.thenApply(str2 -> {            log.info("打印:{}", "异步方法3!");            // 异步执行的代码,也可以是方法,该方法不用单独写到其他类中。            this.async2(String.format("%s-加-异步方法3! - 有返回值 - ", str2));            return "异步执行3 - 有返回值 ";            //连写的方式。        }).thenApply(str3 -> {            String format = String.format("%s- end", str3);            log.error("异步3然后应用 - {}", format);            //返回后面的应用            return format;        });        // 获取future3的返回值:        //如果需要捕获异常、设置等待超时时间,则用get        log.info("future3的返回值(不阻塞):{}", future3.get());//        log.info("future3的返回值(不阻塞-设置等待时间,超时报错:TimeoutException):{}",//                future3.get(2, TimeUnit.SECONDS));        //推荐使用 join方法//        log.info("future3的返回值(阻塞):{}", future3.join());        //阻塞所有异步方法,一起提交后才走下面的代码        CompletableFuture.allOf(future1, future2).join();        log.info("打印:{}", "异步-阻塞-测试的-主方法2-end");    }    //CompletableFuture开启多线程——无返回值的    @Test    public void test06() throws Exception {        List> futures = new ArrayList<>();        //循环,模仿很多任务        for (int i = 0; i < 1000; i++) {            int finalI = i;            CompletableFuture future = CompletableFuture.runAsync(() -> {                //第一批创建的线程数                log.info("打印:{}", finalI);                //模仿io流耗时                try {                    Thread.sleep(5000);                } catch (InterruptedException e) {                    throw new RuntimeException(e);                }            }, myIoThreadPool);            futures.add(future);        }        //阻塞:多线程的任务执行。相当于多线程执行完了,再执行后面的代码        //如果不阻塞,上面的相当于异步执行了。        //阻塞方式1:可以获取返回的异常、设置等待时间//        futures.forEach(future -> {//            try {//                future.get();//            } catch (Exception e) {//                throw new RuntimeException(e);//            }//        });        //阻塞方式2(推荐)        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();        log.info("打印:都执行完了。。。");    }    //CompletableFuture开启多线程——无返回值的——构建一个新List    //相当于多线程执行任务,然后把结果插入到List中    //接收多线程的List必须是线程安全的,ArrayList线程不安全    //线程安全的List —— CopyOnWriteArrayList 替代 ArrayList    @Test    public void test07() throws Exception {        List> futures = new ArrayList<>();        //存数据的List        List addList = new CopyOnWriteArrayList<>();        //循环,模仿很多任务        for (int i = 0; i < 1000; i++) {            int finalI = i;            CompletableFuture future = CompletableFuture.runAsync(() -> {                log.info("打印:{}", finalI);                UserCs userCs = new UserCs();                userCs.setName(String.format("姓名-%s", finalI));                userCs.setAge(finalI);                addList.add(userCs);            }, myIoThreadPool);            futures.add(future);        }        //阻塞        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();        //返回新的List:endList,取age大于10的用户        List endList = addList.stream()                .filter(user -> user.getAge() > 10)                //按照age升序排序                .sorted(Comparator.comparing(UserCs::getAge))                .collect(Collectors.toList());        log.info("打印:都执行完了。。。{}", endList);    }    //CompletableFuture开启多线程——无返回值的——构建一个新List——先有数据的情况    //用CopyOnWriteArrayList 替代 ArrayList接收    @Test    public void test08() throws Exception {        //先获取数据,需要处理的任务。        List users = this.getUserCs();        //开启多线程        List> futures = new ArrayList<>();        //存数据的List        List addList = new CopyOnWriteArrayList<>();        //莫法处理任务        users.forEach(user -> {            CompletableFuture future = CompletableFuture.runAsync(() -> {                //添加数据                user.setName(user.getName() + "-改");                addList.add(user);                log.info("打印-改:{}", user.getName());                //其他的业务逻辑。。。            }, myIoThreadPool);            futures.add(future);        });        //阻塞        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();        //返回新的List:endList        List endList = addList.stream()                .filter(user -> user.getAge() > 10)                //按照age升序排序                .sorted(Comparator.comparing(UserCs::getAge))                .collect(Collectors.toList());        log.info("打印:都执行完了。。。{}", endList);    }    //CompletableFuture开启多线程——有返回值的,返回一个新的List——先有数据的情况——使用stream流的map    //像这种,需要构建另一个数组的,相当于一个线程执行完了,会有返回值    //使用stream流的map + CompletableFuture.supplyAsync()    @Test    public void test09() throws Exception {        //先获取数据,需要处理的任务。        List users = this.getUserCs();        //莫法处理任务        List> futures = users.stream()                .map(user -> CompletableFuture.supplyAsync(() -> {                    // 处理数据                    user.setName(user.getName() + "-改");                    log.info("打印-改:{}", user.getName());                    // 其他的业务逻辑。。。                    return user;                }, myIoThreadPool)).collect(Collectors.toList());        //获取futures        List endList = futures.stream()                //阻塞所有线程                .map(CompletableFuture::join)                //取age大于10的用户                .filter(user -> user.getAge() > 10)                //按照age升序排序                .sorted(Comparator.comparing(UserCs::getAge))                .collect(Collectors.toList());        log.info("打印:都执行完了。。。{}", endList);    }    //基础数据    private List getUserCs() {        List users = new ArrayList<>();        for (int i = 0; i < 1000; i++) {            UserCs userCs = new UserCs();            userCs.setName(String.format("姓名-%s", i));            userCs.setAge(i);            users.add(userCs);        }        return users;    }    //CompletableFuture 异常处理    @Test    public void test10() throws Exception {        //先获取数据,需要处理的任务。        List users = this.getUserCs();        //莫法处理任务        List> futures = users.stream()                .map(user -> CompletableFuture.supplyAsync(() -> {                        if (user.getAge() > 5){                            int a = 1/0;                        }                        // 处理数据                        user.setName(user.getName() + "-改");                        log.info("打印-改:{}", user.getName());                        // 其他的业务逻辑。。。                        return user;                    }, myIoThreadPool)                    //处理异常方式1:返回默认值或者一个替代的 Future 对象,从而避免系统的崩溃或异常处理的问题。                    .exceptionally(throwable -> {                        //可以直接获取user                        System.out.println("异常了:" + user);                        //处理异常的方法……                        //1还可以进行业务处理……比如将异常数据存起来,然后导出……                        //2返回默认值,如:user、null                        //return user;                        //3抛出异常                        throw new RuntimeException(throwable.getMessage());                    })                    //处理异常方式2:类似exceptionally(不推荐)//                    .handle((userCs, throwable) -> {//                        System.out.println("handle:" + user);//                        if (throwable != null) {//                            // 处理异常//                            log.error("处理用户信息出现异常,用户名为:" + user.getName(), throwable);//                            // 返回原始数据//                            return userCs;//                        } else {//                            // 返回正常数据//                            return userCs;//                        }//                    })                )                .collect(Collectors.toList());        //获取futures        List endList = futures.stream()                //阻塞所有线程                .map(CompletableFuture::join)                //取age大于10的用户                .filter(user -> user.getAge() > 10)                //按照age升序排序                .sorted(Comparator.comparing(UserCs::getAge))                .collect(Collectors.toList());        log.info("打印:都执行完了。。。{}", endList);    }    //CompletableFuture 异常处理:如果出现异常就舍弃任务。    // 想了一下,出现异常后的任务确实没有执行下去了,任务不往下执行,怎么会发现异常呢?    // 发现了异常任务也就完了。而且打印了异常,相当于返回了异常。    // 未发生异常的任务会执行完成。如果发生异常都返回空,最后舍弃空的,就得到任务执行成功的 CompletableFuture    @Test    public void test11() {        List users = getUserCs();        List> futures = users.stream()                .map(user -> CompletableFuture.supplyAsync(() -> {                            if (user.getAge() > 15) {                                int a = 1 / 0;                            }                            user.setName(user.getName() + "-改");                            log.info("打印-改:{}", user.getName());                            return user;                        }, myIoThreadPool)                        //处理异常                        .exceptionally(throwable -> {                            //其他处理异常的逻辑                            return null;                        })                )                //舍弃返回的对象是null的 CompletableFuture                .filter(e -> Objects.nonNull(e.join())).collect(Collectors.toList());        //获取futures        List endList = futures.stream()                //阻塞所有线程                .map(CompletableFuture::join)                //取age大于10的用户                .filter(user -> user.getAge() > 10)                //按照age升序排序                .sorted(Comparator.comparing(UserCs::getAge))                .collect(Collectors.toList());        log.info("打印:都执行完了。。。{}", endList);    }}

标签:

广告

X 关闭

广告

X 关闭