가끔 동일한 작업을 반복적으로 수행할 때 ExecutorService 를 사용하면 순차적으로 수행하는 것 보다 낳은 성능을 볼 수 있을 것임.

또한 계산 모듈이 종료되고 나온 결과값으로 다음 작업을 수행할 때 invokeAll() 은 유용할듯.

1. InvokeAllTest.java
package study.concurrency.invoke;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class InvokeAllTest {

 final ExecutorService executorService  = Executors.newFixedThreadPool(5);
 final List<WorkerTask> list    = new ArrayList<WorkerTask>();
 
 public void makeTaskList() {
  for(int i = 0;i<10;i++) {
   list.add(new WorkerTask());
  }
 }
 
 public void test() {
  try { 
   List<Future> futureList = executorService.invokeAll((Collection)list);
  
   try {
    for(Future future : futureList ) {
     System.out.println(future.get());
    }
   } catch (ExecutionException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
   executorService.shutdown();

  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }

  System.out.println("Completed");
 }
 
 public static void main(String[] args) {
  new Launcher().test();
 }
}

2. WorkerTask.java
package study.concurrency.invoke;

import java.util.concurrent.Callable;

public class WorkerTask implements Callable<String> {

 public String call() throws Exception {
  // TODO Auto-generated method stub
  for(int i = 0;i<5;i++) {
   System.out.println(Thread.currentThread().getName() + ":" + "running...");
   Thread.sleep(500);
  }
  return Thread.currentThread().getName() + ":" + "success";
 }
}

Posted by 짱가쟁이
몇년전쯤에 한 선배 왈..
"너는 기본이 부족한거 같다."

처음 그 소리를 들었을때 느껴지는 그 쓸쓸함이란..

요 몇일 java performance fundamental, java concurrency in practice 를 읽으면서 그 선배의 말이 뇌리를 울리는건 무엇일까? 반성하고 또 반성해야겠다.

항상 잊어버리는 누군가를 위해서.. 작업물을 올린다.

1. SimpleServer.java
- 간단한 서버소켓 샘플에 스레드풀만 추가한 단순한 모습.
package study.threadpool.server;

import java.io.IOException;


import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class SimpleServer {

 // 스레드 풀은 실행할 작업이 없어도 corePoolSize 를 유지한다.
 // 최초에 corePoolSize 만큼 스레드를 생성하지 않고 작업이 실행 될 때 corePoolSize 만큼 차례대로 생성함.
 // prestartAllCoreThreads() 를 사용하면 corePoolSize 만큼 스레드를 미리 생성함.
 final int corePoolSize   = 10;
 // xaximumPoolsize 는 동시에 얼마나 많은 개수의 스레드가 동작할 수 있는지를 제한하는 최대값.
 final int xaximumPoolsize  = 10;
 final int blocdingQueueSize = 100;
 final int port    = 10001;
 // keepAliveTime 는 스레드 유지 시간으로 스레드가 keepAliveTime 이상 대기하고 있으면
 // 해당 스레드는 제거 될 수 있음. 풀의 스레드 개수가 corePoolSize 를 넘어서면 제거될 수 있음.
 final long keepAliveTime = 30L;

 final ThreadPoolExecutor pool;
 final ServerSocket servrerSocket;
 
 final BlockingQueue<Runnable> queue;
 final PoolThreadFactory threadFactory;
 
 public SimpleServer() throws IOException {
  queue    = new ArrayBlockingQueue<Runnable>(blocdingQueueSize);
  threadFactory  = new PoolThreadFactory("SERVER_POOL");
  servrerSocket  = new ServerSocket(port);
  pool    = new ThreadPoolExecutor(corePoolSize,
             xaximumPoolsize,
             keepAliveTime,
             TimeUnit.SECONDS,
             queue,
             threadFactory,
             new ThreadPoolExecutor.CallerRunsPolicy());
 
 }
 
 public void serverStart() {
  try {

   while(true) {
    Socket clientSocket = servrerSocket.accept(); 
    pool.execute(new Work(clientSocket));
   } 
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }
 
 public static void main(String[] args) {
  try {
   new SimpleServer().serverStart();
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }
}

2. PoolThreadFactory.java
- ThreadFactory 를 상속받아 직접 제작한 스레드를 실행시킨다.
- 스레드 생성과 종료에 따른 로그를 기록하는 스레드를 사용하기 위해서(책 참조)
package study.threadpool.server;

import java.util.concurrent.ThreadFactory;

public class PoolThreadFactory implements ThreadFactory {

 private final String name;
 
 public PoolThreadFactory(String name) {
  this.name = name;
 }
 
 @Override
 public Thread newThread(final Runnable r) {
  final PoolAppThread poolAppThread = new PoolAppThread(r, name);
  PoolAppThread.setDebug(true);   
  return poolAppThread;
 }
}

3. PoolAppThread.java
- 스레드 생성, 종료에 따른 로그를 남기기 위해 제작된 단순한 예제 쓰레드
package study.threadpool.server;

import java.util.concurrent.atomic.AtomicInteger;

public class PoolAppThread extends Thread {

 public  static final String DEFAULT_POOL_NAME  = "PoolAppThread";
 private static volatile boolean debugLifecycle  = false;
 private static final AtomicInteger created   = new AtomicInteger();
 private static final AtomicInteger alive   = new AtomicInteger();
 
 public PoolAppThread(Runnable r) {
  this(r, DEFAULT_POOL_NAME); 
 }
 
 public PoolAppThread(Runnable runnable, String name) {
        super(runnable, name + "-" + created.incrementAndGet());
        setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            public void uncaughtException(Thread t,
                                          Throwable e) {
            
             System.out.println("UNCAUGHT in thread " + t.getName() + ":" + e);
            }
        });
    }
 
 public void run() {
        // Copy debug flag to ensure consistent value throughout.
        boolean debug = debugLifecycle;
        if (debug) {
         System.out.println("Created : " + getName());        
        }
        try {
            alive.incrementAndGet();
            super.run();
        } finally {
            alive.decrementAndGet();
            if (debug) {
             System.out.println("Exiting : " + getName());
            }
        }
    }
 
 public static int getThreadsCreated() {
        return created.get();
    }

    public static int getThreadsAlive() {
        return alive.get();
    }

    public static boolean getDebug() {
        return debugLifecycle;
    }

    public static void setDebug(boolean b) {
        debugLifecycle = b;
    }
}

4. Work.java
- 받은 데이터 보여주고 헛소리 전달만 하는 무식한넘.
package study.threadpool.server;

import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;


public class Work implements Runnable {

 
 Socket socket;
 
 public Work(Socket socket) {
  this.socket = socket;
 }
 
 @Override
 public void run() {
  // TODO Auto-generated method stub
  try {
   PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
   InputStreamReader in = new InputStreamReader(socket.getInputStream());
  
   char[] rBuf = new char[1000];
   int readMsg = in.read(rBuf);
  
   String msg = String.valueOf(rBuf).trim();
  
   System.out.println("read : [" + msg + "]");
  
   try {
    Thread.sleep(50);
   } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
  
   out.println("이거나 받으삼\n");
  
  } catch (IOException e) {
   // TODO Auto-generated catch block
   System.err.println("Oh, dear me! " + e);
  }
 }
}

성능을 높이기 위해서 여러가지 방법들이 있는거 같지만. 그때는 직접 하나하나 따려가면서 작업하는게 낳을듯.. 지금은 단순히 샘플만..

 
Posted by 짱가쟁이
ThreadPoolExecutor 는  beforeExecute, afterExecute, terminated 등의 훅 메소드를 제공한다.
ThreadPoolExecutor 클래스를 상속받은 TimingThreadPoolExecutor 를 사용하여 쓰레드의 동작 시간을 알 수 있다.

성능 테스트 시 유용하게 사용할 수 있을듯..(아니면 말구..)

TimingThreadPoolExecutor.java
package study.threadpool.timing;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;

public class TimingThreadPoolExecutor extends ThreadPoolExecutor {

    public TimingThreadPoolExecutor(int corePoolSize,
            int maximumPoolSize,
            long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue) {
  super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
 }

 private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
    private final AtomicLong numTasks = new AtomicLong();
    private final AtomicLong totalTime = new AtomicLong();

    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        startTime.set(System.nanoTime());
    }

    protected synchronized void afterExecute(Runnable r, Throwable t) {
        try {
            long endTime = System.nanoTime();
            long taskTime = endTime - startTime.get();
            numTasks.incrementAndGet();
            totalTime.addAndGet(taskTime);
        } finally {
            super.afterExecute(r, t);
        }
    }

    protected synchronized void terminated() {
        try {
         System.out.println("Terminated:");
         System.out.println(String.format(" total    time=%dns", totalTime.get()));
         System.out.println(String.format(" numTasks time=%d", numTasks.get()));        
            System.out.println(String.format(" avg time=%dns", (totalTime.get() / numTasks.get())));
                    
        } finally {
            super.terminated();
        }
    }
}

Work.java
- 하는일 없이 3초동안 잠만 자는 놈.
package study.threadpool.timing;

public class Work implements Runnable {

 @Override
 public void run() {
  // TODO Auto-generated method stub
  try {
   for(int i = 0;i<3;i++) {
    System.out.println("running...[" + i + "]");
    Thread.sleep(1000);
   }
  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }
}

Launcher.java
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Launcher {

 final ThreadPoolExecutor pool;
 final BlockingQueue<Runnable> queue;

 public Launcher() {
  queue    = new ArrayBlockingQueue<Runnable>(1);
  pool    = new TimingThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, queue);
 }
 
 public void test() {
  for(int i = 0;i<2;i++)
   pool.execute(new Work());
 
  pool.shutdown();
 }
 public static void main(String[] args) {
  new Launcher().test();
 }
}


결과
- 1.5 이상부터 System.nanoTime() 지원 됨.
running...[0]
running...[1]
running...[2]
running...[0]
running...[1]
running...[2]
Terminated:
 total    time=5999990250ns
 numTasks time=2
 avg time=2999995125ns

Posted by 짱가쟁이
정상적으로 종료되는 JVM 은 가장 먼저 종료 훅을 실행 시킨다.
Runtime.getRuntime().addShutdownHook(..) 에 등록된 실행되지 않은 스레드를 의미함.

Example
package study.shutdownhook;

public class Launcher {

 public static void main(String[] args) {
  Runtime.getRuntime().addShutdownHook(new Thread() {
   public void run() {
    System.out.println("종료훅 실행..");
   }
  });
 
  ShutdownHookThread t = new ShutdownHookThread();
  t.start();
 
 
  try {
   Thread.sleep(5000);
  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  t.interrupt();
 }
}

class ShutdownHookThread extends Thread {
 public void run() {
  try {
   while(!Thread.currentThread().isInterrupted()) {
    System.out.println("running...");
  
    Thread.sleep(1000);
   }
  } catch (InterruptedException ignored) {}
 }
}

Posted by 짱가쟁이
인터럽트에 응답하니께. 작업을 할 수 있는데. 응답하지 못하는 놈들은?
뭐.. 우선은 Future 를 이용해서 작업을 중단해 보더라구..

1. Launcher.java
- newSingleThreadExecutor() 의 Executor 를 생성해서 사용함.
- Timeout Exception 이 발생하면 해당 잡업을 종료하는 무식한 넘임.
package study.interrupt.future;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Launcher {

 private static final ExecutorService exce = Executors.newSingleThreadExecutor();
 
 public static void main(String[] args) {
  BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
 
  Future<?> task = exce.submit(new Task(queue));
  
  try {
   task.get(3, TimeUnit.SECONDS);
  } catch (InterruptedException e) {
   e.printStackTrace();
  } catch (ExecutionException e) {
   e.printStackTrace();
  } catch (TimeoutException e) {
   e.printStackTrace();
   task.cancel(true);
  }

  if(task.isCancelled()) {
   System.out.println("종료됨");
  
  }
  for(String str : queue) {
   System.out.print(str + " : " );
  }

 }
}

2. Task.java
- 별루 하는 일 없이 처먹다가 배부르면 안먹는다고 뻐팅기는 지극히 현실적이면서.. 무식한놈.
package study.interrupt.future;

import java.util.concurrent.BlockingQueue;

public class Task implements Runnable {

 private final BlockingQueue<String> queue;
 
 Task(BlockingQueue<String> queue) {
  this.queue = queue;
 }
 
 @Override
 public void run() {
  try {
  
   int index = 0;
  
   while(!Thread.currentThread().isInterrupted()) {
    queue.put("" + index);
    System.out.print(index + " : ");
    index ++;
  
    Thread.sleep(50);
   }
  } catch(InterruptedException e) {
   e.printStackTrace();
  }
 }
}

Posted by 짱가쟁이
wait 상태의 Thread 를 종료하고 싶을 때.. interrupt 를 사용하자.
뭐.. 스레드를 종료할 때 항상 interrupt 를 사용해서 종료하면 좋다고는 함.

1. SampleThread.java
- interrupt 를 사용해서 thread 를 종료한다.
package study.interrupt;

import java.util.concurrent.BlockingQueue;

public class SampleThread extends Thread {

 private final BlockingQueue<String> queue;
 
 SampleThread(BlockingQueue<String> queue) {
  this.queue = queue;
 }
 
 @Override
 public void run() {
  try {
  
   int index = 0;
  
   while(!Thread.currentThread().isInterrupted()) {
    queue.put("" + index);
    System.out.print(index + " : ");
    index ++;
   }
  } catch(InterruptedException e) {
   System.out.println();
  }
 }
 
 public void cancel() {
  interrupt();
 }

}

2. 테스트
package study.interrupt;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class Launcher {

 public static void main(String[] args) {
  BlockingQueue<String> queue = new ArrayBlockingQueue(10);
 
  SampleThread t = new SampleThread(queue);
  t.start();
 
  try {
   Thread.sleep(3000);
  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  t.cancel();

  for(String str : queue) {
   System.out.print(str + " : " );
  }

 }
}

Posted by 짱가쟁이
이번 예제도 "Excutor를 사용한 thread pool 샘플" 을 토대로 작성됨
Executor 를 상속받은 ExecutorService 를 사용하여 pool 의 동작 주기를 관리한다.
void stop(..) 함수는 java api 를 참조함..

1. ConsumerThread.java 수정

package study.concurrrency;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

public class ConsumerThread implements Runnable {

 Thread thread;

 private static final ExecutorService pool = Executors.newFixedThreadPool(10);
 
 public void start() {
  thread = new Thread(this);
  thread.start();
 }
 
 // java api 참조
 void stop(ExecutorService pool) {
  pool.shutdown(); // Disable new tasks from being submitted
 
  try {
   // Wait a while for existing tasks to terminate
   if (! pool.awaitTermination(60, TimeUnit.SECONDS)) {
    pool.shutdownNow(); // Cancel currently executing tasks
    // Wait a while for tasks to respond to being cancelled
    if (! pool.awaitTermination(60, TimeUnit.SECONDS))
     System.err.println("Pool did not terminate");
   }
  } catch (InterruptedException ie) {
   // (Re-) Cancel if current thread also interrupted
   pool.shutdownNow();
   // Preserve interrupt status
   Thread.currentThread().interrupt();
  }
 }
 
 @Override
 public void run() {
  // TODO Auto-generated method stub 
  while(!pool.isShutdown()) { 
   try {
    pool.execute(new Worker(BlockingQueueSample.getInstance().getData()));
   } catch(RejectedExecutionException e) {
    if(!pool.isShutdown()) {
     System.out.println("작업이 거부당했구만..");
    }
   }
  }
 }
}




 

 

Posted by 짱가쟁이
전에 작성 했던.. "BolckingQueue 와 producer-consumer 패던 활용법 " 예제를 토대로 Executor로 thread pool 예제를 작성거임.
 
1. ConsumerThread.java  수정
  - 등록된 상품이 있으면 무식하게 가져와서 화면에 보여주던 놈을 조금 수정해서 Worker.java 라는 넘으로 Job을 위임함.
  - Executors.newFixedThreadPool 를 사용해 pool 생성 후 Worker.java 를 실행함.
package study.concurrrency;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class ConsumerThread implements Runnable {

 Thread thread;

 private static final Executor exec = Executors.newFixedThreadPool(10);
 
 public void start() {
  thread = new Thread(this);
  thread.start();
 }
 
 @Override
 public void run() {
  // TODO Auto-generated method stub 
  while(true) {   
   exec.execute(new Worker(BlockingQueueSample.getInstance().getData()));
  }
 }
}

2. Worker.java
  - 넘어온 상품(job)을 무식하게 화면에 보여주는 Thread.
package study.concurrrency;

public class Worker implements Runnable {

 String str;
 
 public Worker(String str) {
  this.str = str;
 }
 @Override
 public void run() {
  // TODO Auto-generated method stub
  System.out.println(str);
  try {
   Thread.sleep(500);
  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 
 }

}

3. 실행
  - 예전 코드 그대로 실행하면 됨.

4. 결과
  - 위에서 설정했던 pool 사이즈 만큼 job 들이 실행되는 것을 확인할 수 있음.

 
Posted by 짱가쟁이
출처

1. BlockingQueueSample.java
 - 등록된 상품이 있으면 ConsumerThread.java 요넘이 가져가고.. 없으면 대기 시킴.
package study.concurrrency;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueSample {

 static BlockingQueueSample instance;
 
 BlockingQueue queue = new ArrayBlockingQueue(10);
 
 public synchronized static BlockingQueueSample getInstance() {
  if(instance == null)
   instance = new BlockingQueueSample();
 
  return instance;
 }
 
 public void setData(String arg) {
  try {
   queue.put(arg);
  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }
 
 public String getData() {
  try {
   return (String)queue.take();
  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  return null;
 }
}

2. ConsumerThread.java
 - BlockingQueueSample.java 에 등록된 상품이 있으면 무조건 가져오는 무식한 놈임. 이런 소비자만 있으면 무쟈게 행복할듯..
package study.concurrrency;

public class ConsumerThread implements Runnable {

 Thread thread;

 public void start() {
  thread = new Thread(this);
  thread.start();
 }
 
 @Override
 public void run() {
  // TODO Auto-generated method stub 

  while(true) { 
   System.out.println("aaa : " + BlockingQueueSample.getInstance().getData());
   System.out.println("aaa : " + BlockingQueueSample.getInstance().queue.size());
  
   try {
    Thread.sleep(500);
   } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
  }
 }
}

3. ProducerThread.java
 - 원하는 제품을 무식하게 만들어 주는 놈.
package study.concurrrency;

public class ProducerThread implements Runnable {

 Thread thread;

 String string;
 int time;
 
 public ProducerThread(String string, int time) {
  this.string = string;
  this.time = time;
 }
 public void start() {
  thread = new Thread(this);
  thread.start();
 }
 
 @Override
 public void run() {
  // TODO Auto-generated method stub 

  int index = 0;
 
  while(index<10) {
   index++;
  
   BlockingQueueSample.getInstance().setData(string + "[" + index + "]");
   try {
    Thread.sleep(time);
   } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
  }
 }
}

4. 테스트
ConsumerThread sample = new ConsumerThread();
sample.start();
 
ProducerThread p1 = new ProducerThread("가방", 1);
p1.start();
 
ProducerThread p2 = new ProducerThread("지갑", 1);
p2.start();

ps.
  위 예제를 참고로 Excutor를 사용해서 Thread Pool을 만들어 봐야 할듯.
Posted by 짱가쟁이
Posted by 짱가쟁이