`

淘宝面试题:如何充分利用多核CPU,计算很大的List中所有整数的和

阅读更多
永久链接:http://flysnow.iteye.com/blog/711162
引用
前几天在网上看到一个淘宝的面试题:有一个很大的整数list,需要求这个list中所有整数的和,写一个可以充分利用多核CPU的代码,来计算结果。

一:分析题目
从题中可以看到“很大的List”以及“充分利用多核CPU”,这就已经充分告诉我们要采用多线程(任务)进行编写。具体怎么做呢?大概的思路就是分割List,每一小块的List采用一个线程(任务)进行计算其和,最后等待所有的线程(任务)都执行完后就可得到这个“很大的List”中所有整数的和。
二:具体分析和技术方案
既然我们已经决定采用多线程(任务),并且还要分割List,每一小块的List采用一个线程(任务)进行计算其和,那么我们必须要等待所有的线程(任务)完成之后才能得到正确的结果,那么怎么才能保证“等待所有的线程(任务)完成之后输出结果呢”?这就要靠java.util.concurrent包中的CyclicBarrier类了。它是一个同步辅助类,它允许一组线程(任务)互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程(任务)的程序中,这些线程(任务)必须不时地互相等待,此时 CyclicBarrier 很有用。简单的概括其适应场景就是:当一组线程(任务)并发的执行一件工作的时候,必须等待所有的线程(任务)都完成时才能进行下一个步骤。具体技术方案步骤如下:
  • 分割List,根据采用的线程(任务)数平均分配,即list.size()/threadCounts。
  • 定义一个记录“很大List”中所有整数和的变量sum,采用一个线程(任务)处理一个分割后的子List,计算子List中所有整数和(subSum),然后把和(subSum)累加到sum上。
  • 等待所有线程(任务)完成后输出总和(sum)的值。

示意图如下:

三:详细编码实现
代码中有很详细的注释,这里就不解释了。
/**
 * 计算List中所有整数的和<br>
 * 采用多线程,分割List计算
 * @author 飞雪无情
 * @since 2010-7-12
 */
public class CountListIntegerSum {
	private long sum;//存放整数的和
	private CyclicBarrier barrier;//障栅集合点(同步器)
	private List<Integer> list;//整数集合List
	private int threadCounts;//使用的线程数
	public CountListIntegerSum(List<Integer> list,int threadCounts) {
		this.list=list;
		this.threadCounts=threadCounts;
	}
	/**
	 * 获取List中所有整数的和
	 * @return
	 */
	public long getIntegerSum(){
		ExecutorService exec=Executors.newFixedThreadPool(threadCounts);
		int len=list.size()/threadCounts;//平均分割List
		//List中的数量没有线程数多(很少存在)
		if(len==0){
			threadCounts=list.size();//采用一个线程处理List中的一个元素
			len=list.size()/threadCounts;//重新平均分割List
		}
		barrier=new CyclicBarrier(threadCounts+1);
		for(int i=0;i<threadCounts;i++){
			//创建线程任务
			if(i==threadCounts-1){//最后一个线程承担剩下的所有元素的计算
				exec.execute(new SubIntegerSumTask(list.subList(i*len,list.size())));
			}else{
				exec.execute(new SubIntegerSumTask(list.subList(i*len, len*(i+1)>list.size()?list.size():len*(i+1))));
			}
		}
		try {
			barrier.await();//关键,使该线程在障栅处等待,直到所有的线程都到达障栅处
		} catch (InterruptedException e) {
			System.out.println(Thread.currentThread().getName()+":Interrupted");
		} catch (BrokenBarrierException e) {
			System.out.println(Thread.currentThread().getName()+":BrokenBarrier");
		}
		exec.shutdown();
		return sum;
	}
	/**
	 * 分割计算List整数和的线程任务
	 * @author lishuai
	 *
	 */
	public class SubIntegerSumTask implements Runnable{
		private List<Integer> subList;
		public SubIntegerSumTask(List<Integer> subList) {
			this.subList=subList;
		}
		public void run() {
			long subSum=0L;
			for (Integer i : subList) {
				subSum += i;
			}  
			synchronized(CountListIntegerSum.this){//在CountListIntegerSum对象上同步
				sum+=subSum;
			}
			try {
				barrier.await();//关键,使该线程在障栅处等待,直到所有的线程都到达障栅处
			} catch (InterruptedException e) {
				System.out.println(Thread.currentThread().getName()+":Interrupted");
			} catch (BrokenBarrierException e) {
				System.out.println(Thread.currentThread().getName()+":BrokenBarrier");
			}
			System.out.println("分配给线程:"+Thread.currentThread().getName()+"那一部分List的整数和为:\tSubSum:"+subSum);
		}
		
	}
	
}

有人可能对barrier=new CyclicBarrier(threadCounts+1);//创建的线程数和主线程main有点不解,不是采用的线程(任务)数是threadCounts个吗?怎么为CyclicBarrier设置的给定数量的线程参与者比我们要采用的线程数多一个呢?答案就是这个多出来的一个用于控制main主线程的,主线程也要等待,它要等待其他所有的线程完成才能输出sum值,这样才能保证sum值的正确性,如果main不等待的话,那么结果将是不可预料的。
/**
 * 计算List中所有整数的和测试类
 * @author 飞雪无情
 * @since 2010-7-12
 */
public class CountListIntegerSumMain {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		List<Integer> list = new ArrayList<Integer>();
		int threadCounts = 10;//采用的线程数
		//生成的List数据
		for (int i = 1; i <= 1000000; i++) {
			list.add(i);
		}
		CountListIntegerSum countListIntegerSum=new CountListIntegerSum(list,threadCounts);
		long sum=countListIntegerSum.getIntegerSum();
		System.out.println("List中所有整数的和为:"+sum);
	}

}

四:总结
本文主要通过一个淘宝的面试题为引子,介绍了并发的一点小知识,主要是介绍通过CyclicBarrier同步辅助器辅助多个并发任务共同完成一件工作。Java SE5的java.util.concurrent引入了大量的设计来解决并发问题,使用它们有助于我们编写更加简单而健壮的并发程序。

附mathfox提到的ExecutorService.invokeAll()方法的实现
这个不用自己控制等待,invokeAll执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。sdh5724也说用了同步,性能不好。这个去掉了同步,根据返回结果的 Future 列表相加就得到总和了。
/**
 * 使用ExecutorService的invokeAll方法计算
 * @author 飞雪无情
 *
 */
public class CountSumWithCallable {

	/**
	 * @param args
	 * @throws InterruptedException 
	 * @throws ExecutionException 
	 */
	public static void main(String[] args) throws InterruptedException, ExecutionException {
		int threadCounts =19;//使用的线程数
		long sum=0;
		ExecutorService exec=Executors.newFixedThreadPool(threadCounts);
		List<Callable<Long>> callList=new ArrayList<Callable<Long>>();
		//生成很大的List
		List<Integer> list = new ArrayList<Integer>();
		for (int i = 0; i <= 1000000; i++) {
			list.add(i);
		}
		int len=list.size()/threadCounts;//平均分割List
		//List中的数量没有线程数多(很少存在)
		if(len==0){
			threadCounts=list.size();//采用一个线程处理List中的一个元素
			len=list.size()/threadCounts;//重新平均分割List
		}
		for(int i=0;i<threadCounts;i++){
			final List<Integer> subList;
			if(i==threadCounts-1){
				subList=list.subList(i*len,list.size());
			}else{
				subList=list.subList(i*len, len*(i+1)>list.size()?list.size():len*(i+1));
			}
			//采用匿名内部类实现
			callList.add(new Callable<Long>(){
				public Long call() throws Exception {
					long subSum=0L;
					for(Integer i:subList){
						subSum+=i;
					}
					System.out.println("分配给线程:"+Thread.currentThread().getName()+"那一部分List的整数和为:\tSubSum:"+subSum);
					return subSum;
				}
			});
		}
		List<Future<Long>> futureList=exec.invokeAll(callList);
		for(Future<Long> future:futureList){
			sum+=future.get();
		}
		exec.shutdown();
		System.out.println(sum);
	}

}

一些感言
这篇文章是昨天夜里11点多写好的,我当时是在网上看到了这个题目,就做了一下分析,写了实现代码,由于水平有限,难免有bug,这里感谢xifo等人的指正。这些帖子从发表到现在不到24小时的时间里创造了近9000的浏览次数,回复近100,这是我没有想到的,javaeye很久没这么疯狂过啦。这不是因为我的算法多好,而是因为这个题目、这篇帖子所体现出的意义。大家在看完这篇帖子后不光指正错误,还对方案进行了改进,关键是思考,人的思维是无穷的,只要我们善于发掘,善于思考,总能想出一些意想不到的方案。

从算法看,或者从题目场景对比代码实现来看,或许不是一篇很好的帖子,但是我说这篇帖子是很有意义的,方案也是在很多场景适用,有时我们可以假设这不是计算和,而是把数据写到一个个的小文件里,或者是分割进行网络传输等等,都有一定的启发,特别是回帖中的讨论。

单说一下回帖,我建议进来的人尽量看完所有的回帖,因为这里是很多人集思广益的精华,这里有他们分析问题,解决问题的思路,还有每个人提到的解决方案,想想为什么能用?为什么不能用?为什么好?为什么不好?


我一直相信:讨论是解决问题、提高水平的最佳方式!

  • 大小: 95.8 KB
分享到:
评论
185 楼 ruijin5566 2015-12-13  
package concurrent;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

/**
 * 用Java7中的Fork/Join框架计算 int数组之和
 * 《Java并发编程的艺术》Page177-178
 */
public class CountSumWithForkJoin {

	private static final int THREADCOUNT = Runtime.getRuntime().availableProcessors()+1; //线程数
	private static final int NUM = 20000000 + 1; //要计算的list大小
	private static final int THRESHOLD = NUM / THREADCOUNT; //阈值
	private static List<Integer> list = new ArrayList<Integer>(NUM); //需要计算和的数组
	
	public static void main(String[] args) {
		// 初始化数组
		for (int i = 0; i < NUM; i++) {
			list.add(i);
		}
		long startTime = System.nanoTime();
		long serialSum = 0;
		for (int i = 0; i < NUM; i++) {
			serialSum += list.get(i);
		}
		long usedTime = System.nanoTime() - startTime;
		System.out.println("[单线程] Sum is : " + serialSum + ", time is : " + usedTime);
		
		startTime = System.nanoTime();
		ForkJoinPool forkJoinPool = new ForkJoinPool();
		CountTask task = new CountTask(0, NUM - 1);
		Future<Long> result = forkJoinPool.submit(task);
		try {
			long sum = result.get();
			usedTime = System.nanoTime() - startTime;
			System.out.println("[多线程] Sum is : " + sum + ", time is : " + usedTime);
		} catch (InterruptedException e) {
		} catch (ExecutionException e) {
		}
	}
	
	static class CountTask extends RecursiveTask<Long> {
		private int start;
		private int end;
		
		public CountTask(int start, int end) {
			this.start = start;
			this.end = end;
		}
		
		@Override
		protected Long compute() {
			long sum = 0L;
			//如果任务小就计算任务
			boolean canCompute = (end - start) <= THRESHOLD;
			if (canCompute) {
				for (int i = start; i <= end; i++) {
					sum += list.get(i); //如果用list.get操作,则非常耗时,在任何情况下,耗时都比单线程高
				}
			} else {
				//如果任务大于阈值,就分裂成两个子任务计算
				int middle = (start + end) / 2;
				CountTask leftTask = new CountTask(start, middle);
				CountTask rightTask = new CountTask(middle + 1, end);
				//执行子任务
				leftTask.fork();
				rightTask.fork();
				//等待子任务执行完毕,并得到其结果
				long leftResult = leftTask.join();
				long rightResult = rightTask.join();
				sum = leftResult + rightResult;
				System.out.println(Thread.currentThread().getName() + ", start=" + start 
						+ ", end=" + end + ", middle=" + middle + ", sum=" + sum);
			}
			return sum;
		}
		
	}
	
}

昨天看完了《Java并发编程的艺术》,今天用Java7里面的Fork/Join框架写了一个,在我的笔记本(i5-4210H CPU @ 2.90GHz,双核四线程)上跑,几乎在任何NUM下,单线程的效率都更高,哪里出问题了,囧。
184 楼 Checkmate 2014-09-15  
fork-join
183 楼 793059909 2014-07-28  
chenyongxin 写道
mercyblitz 写道
viei 写道
个人觉的你的这些算法啊,线程操作可能都是白忙活,或者说对这种问题处理的很浅,还不深入。算法挺简单的实现起来也有很多办法,多线程调度什么的也都是基本java知识。
我个人觉的的你要抓住问题的重点,要是用多cpu充分发挥多cpu的优势,首先要把任务分发,你只是起多个线程并不一定操作jvm和操作系统就会把任务分发到多cpu上执行,只有可能时间片切的更小,在执行这些任务。
所以这个过程中考虑的重点应该是
1:操作系统(开多个jvm,规定每个jvm进程运行在指定cpu上,肯定比一个进程下的多个线程只占用一个cpu对多cpu的压榨更好)
2:jvm调整(大数据量必然涉及到垃圾回收)
3:程序编写
上面这些弄好了,再深入一点就考虑,同步消耗问题
cpu多了,同步因素也是决定是否能把多cpu和性能转化率提高出来的一个重要环节。


你的说法容易误人子弟。

Java1.2之后,Java多线程依赖于操作系统的内核线程调度。操作系统会不会发挥多处理的优势呢?肯定会啊,操作系统作为基础设施,实时性是它最重视之一。
针对于你的观点,我进行反驳:
1.在进程之间,多个JVM的Heap不能相互共享。更谈不上制定那个CPU制定JVM进行,要知道主存是共享的,CPU的处理数据的。多核CPU不是多台机器,那个CPU负责,是由OS调度,用户进程没有办法控制内核进程。

2.List#sublist方法实现,当大List分离出多个小List,小List并没有放弃大的List,而是还是引用的大List。在计算之中,不会被GC掉。之后被GC掉,对计算没有影响。调整JVM需要是对的,但是调整是Java Heap的大小,而不是为了GC。如果要说GC的话,计算中虽然不会被GC,但是GC会停顿(Pause/Stop World操作),丧失了实时性。





关于"ava1.2之后,Java多线程依赖于操作系统的内核线程调度。"这个点,有没有官方文档或例子可以来演示一下

182 楼 little_shieh 2014-07-17  
little_shieh 写道
用CountDownLatch来实现吧,也很方便


用Executor Framework来做也很简单

public class CalculatorSubList implements Callable<Long> {
private List<Long> subList;

public CalculatorSubList(List<Long> subList) {
super();
this.subList = subList;
}

@Override
public Long call() throws Exception {
Long subSum = 0L;
for(Long item:subList){
subSum+=item;
}
return subSum;
}
}

public class Main {

static List<Long> calList = new ArrayList<Long>();
static int subSize = 12;//每个子列表的长度
static int threadNum = 0;//计算需要多少个线程去计算

static {
for (Long i = 0L; i < 1000000000004454444L; i++) {
calList.add(i);
}
threadNum = (calList.size() + subSize - 1) / subSize;
}

public static void main(String[] args) {
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
.newFixedThreadPool(threadNum);

List<Future<Long>> resultList = new ArrayList<Future<Long>>();

for (int i = 0; i < threadNum; i++) {
int fromIndex = subSize * i;
int toIndex = (subSize * (i + 1) > calList.size()) ? calList.size()
: subSize * (i + 1);
CalculatorSubList calculator = new CalculatorSubList(calList.subList(fromIndex, toIndex));

Future<Long> result = executor.submit(calculator);

resultList.add(result);
}

do {
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (executor.getCompletedTaskCount() < resultList.size());

System.out.printf("Main: Results\n");
Long tolSum = 0L;
for (int i = 0; i < resultList.size(); i++) {
Future<Long> result = resultList.get(i);

Long number = null;

try {
number = result.get();
tolSum += number;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
System.out.printf("Main: Result : %d\n", tolSum);
executor.shutdown();
}
181 楼 ray_linn 2014-07-14  
分割 list 的做法简直是。。浪费时间,应该从各自的下标开始计算。

比如3个线程,list 长度为10

线程1:负责 0, 1, 2;
线程2:负责 3, 4, 5
线程3:负责 6, 7, 8,9;
180 楼 little_shieh 2014-07-14  
用CountDownLatch来实现吧,也很方便
179 楼 skzr.org 2013-05-10  
checkes 写道
checkes 写道
还得考虑一下进程的通信,多核cpu情况,假如是4核,当前jdk单jvm不管你怎么设计,只能使用cpu25%,所以你得开4个进程,并实现jvm通信,最方便的是jvm,当然还有其它文件共享之类,但是会稍显麻烦

就java应该是没有问题,我用jni调用一个算法库遇到了这处问题,我估计是算法和cpu绑定了


JVM本身是支持多核的,你的jni只是在一个线程中调用吧,如果这样,当然只能使用一个cpu线程,要利用cpu的话,要用多个Thread调用jni才能享受到优势。
178 楼 checkes 2013-05-09  
checkes 写道
还得考虑一下进程的通信,多核cpu情况,假如是4核,当前jdk单jvm不管你怎么设计,只能使用cpu25%,所以你得开4个进程,并实现jvm通信,最方便的是jvm,当然还有其它文件共享之类,但是会稍显麻烦

就java应该是没有问题,我用jni调用一个算法库遇到了这处问题,我估计是算法和cpu绑定了
177 楼 checkes 2013-05-09  
还得考虑一下进程的通信,多核cpu情况,假如是4核,当前jdk单jvm不管你怎么设计,只能使用cpu25%,所以你得开4个进程,并实现jvm通信,最方便的是jvm,当然还有其它文件共享之类,但是会稍显麻烦
176 楼 mutuu__ 2012-08-06  
如果是JDK1.7的话,用Fork/Join就可以了吧,类似于MapReduce。
175 楼 yangfuchao418 2012-07-10  
楼主 使用FutureTask也行吧
174 楼 xm_king 2011-08-01  
winstars 写道
我觉得这种东西迟早都会过时的,靠程序员在代码中体现多核技术,就跟当年程序员手动分配内存一样,迟早会变成过去式。

不能因为过时就不学习了吧,这样的话,那就只能原地踏步
173 楼 star022 2011-05-20  
我来个:
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class TestSum {
    private static final int           cpu             = 8;
    private static ExecutorService     executorService = Executors.newFixedThreadPool(cpu);
    private static AtomicInteger       taskCounter     = new AtomicInteger(0);
    private static int                 indexCounter    = 0;
    private static Set<Long>           tmpResultSet    = new HashSet<Long>();
    private static final List<Integer> ints            = new ArrayList<Integer>();

    private TestSum() {
    }

    public static void main(String[] args) {
        //构建测试数据开始
        final int count = 25702301;//我的笔记本最多能承受 值,过大就OOM =.=
        for (int i = 0; i < count; i++) {
            int value = (int) (Math.random() * 100);
            ints.add(value);
        }
        //构建测试数据结束,开始求和
        /**
         * 假设为8核cpu,将整个list 虚拟地分别8段,每个线程负责一段的计算sum
         */
        final int avg = count / cpu;
        while (taskCounter.getAndIncrement() < cpu) {//保障只能起和cpu个数一样多的线程
            executorService.execute(new Runnable() {
                public void run() {
                    int curt = indexCounter++;
                    int min = avg * curt;
                    int max;
                    if (curt != (cpu - 1)) {
                        max = avg * (curt + 1);
                    } else {
                        max = count;
                    }
                    System.out.println(max);
                    long result = 0;
                    for (int i = min; i < max; i++) {
                        result = result + ints.get(i);
                    }
                    tmpResultSet.add(result);
                    if (tmpResultSet.size() == cpu) {
                       long finalResult = 0;
                        for (long curtTmpResult : tmpResultSet) {
                            finalResult = finalResult + curtTmpResult;
                        }
                        System.out.println("finalResult=" + finalResult);
                    }
                }
            });
        }
        executorService.shutdown();
    }
}
172 楼 star022 2011-05-20  
JDK7的 fork/jion框架 能很好地解决这类问题。
171 楼 whao189 2011-05-19  
每次看到 “飞雪无情 ” 的帖子 貌似我都能有些收获这次也不例外!
170 楼 zoutm 2011-05-19  
本题的目的大家都清楚,利用java多线程来充分利用多CPU或多核。具体方案如下:
1.多线程,线程数目已经有人提到过,最佳的线程数应该是跟CPU核相关,调用Runtime.getRuntime().availableProcessors获取处理器数。
2.很多人采用拆分List,我的观点是不拆分。调用List.size,平均分配索引范围给多个线程。
3.每个线程按索引范围依次统计。最后主线程等待其他线程统计完成后最终统计值。
package com.webex.dms2.components.zk.common;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.commons.lang.math.RandomUtils;

public class ListCounter {
	private static List dataPrepair(int listSize) {
		List<Integer> list = new ArrayList<Integer>(listSize);
		for (int i = 0; i < listSize; i++) {
			list.add(RandomUtils.nextInt());
		}
		return list;
	}

	public static void main(String[] args) throws Exception {
		int threadNum = Runtime.getRuntime().availableProcessors();
		int listSize = 10000000;
		final List<Integer> list = dataPrepair(listSize);
		int threadListSize = listSize / threadNum;
		assert listSize > threadNum;
		final int[] dataRange = new int[threadNum];
		for (int i = 0; i < threadNum - 1; i++) {
			dataRange[i] = threadListSize * (i + 1);
		}
		dataRange[threadNum - 1] = listSize;
		ExecutorService executor = Executors.newFixedThreadPool(threadNum);
		Future<Long>[] results = new Future[threadNum];
		for (int i = 0; i < threadNum; i++) {
			results[i] = executor.submit(new CounterTask(dataRange[i]
					- threadListSize, dataRange[i], list));
		}
		long total = 0l;
		for (int i = 0; i < threadNum; i++) {
			total += results[i].get();
		}
		System.out.println("the list total sum:" + total);
		executor.shutdown();

	}

	private static class CounterTask implements Callable<Long> {
		final private int lowerLimit;
		final private int upperLimit;
		final private List<Integer> list;

		public CounterTask(int lowerLimit, int upperLimit,
				List<Integer> counteredList) {
			this.lowerLimit = lowerLimit;
			this.upperLimit = upperLimit;
			this.list = counteredList;
		}

		@Override
		public Long call() throws Exception {
			long total = 0;
			for (int i = lowerLimit; i < upperLimit; i++) {
				total += list.get(i);
			}
			return total;
		}

	}

}


169 楼 notime8888 2011-03-29  
没有必要这么麻烦的
如果提交任务数与固定线程池线程数相等的话


pool.shutdown()

while(!pool.isTerminated()){
   // sleep 500L
}

sysout(sum);



shutdown会等待所有正在执行的任务执行完毕
isTerminated直到调用shutdown且所有任务complete才会返回true

当然 上述代码的前提是提交任务数<=固定线程池线程数
168 楼 winstars 2011-03-29  
我觉得这种东西迟早都会过时的,靠程序员在代码中体现多核技术,就跟当年程序员手动分配内存一样,迟早会变成过去式。
167 楼 s929498110 2011-03-28  
额。。。看错了。。。 都对
166 楼 s929498110 2011-03-28  
我晕。。。我自己写的代码和你给的那个Callable代码运算结果不一样。。。

相关推荐

Global site tag (gtag.js) - Google Analytics