java.util.concurrent.locks.Condition
- 하나의 락에 여러 조건으로 대기하게 할 수 있다. (예제 참고)
- 인터럽트에 반응하거나 반응하지 않은 대기 상태를 만들수 있다.
- 데드라인을 정해둔 대기 상태를 만들 수 있다.
package java.util.concurrent.locks;

import java.util.concurrent.*;
import java.util.Date;

public interface Condition {
    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();
}

예제
java api 참조

Condition 객체를 활용하여 put(), take() 에 조건별로 나눠서 대기하기 때문에 siganlAll() 대신 더 효율적인 signal() 을 사용할 수 있게 되었다. 뭐 java api 내용을 보면 ArrayBlockingQueue 에서 이 기능을 하기 때문에 따로 구현하지 말라고 나오기는 함 ㅋㅋ
package study.lock.condition;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer<T> {
 final Lock   lock   = new ReentrantLock();
 final Condition notFull  = lock.newCondition();
 final Condition notEmpty = lock.newCondition();
 final T[]   items   = (T[]) new Object[100];
 int tail, head, count;

 public void put(T x) throws InterruptedException {
  lock.lock();
  try {
   while (count == items.length)
    notFull.await();
   items[tail] = x;
   if (++tail == items.length) tail = 0;
   ++count;
   notEmpty.signal();
  } finally {
   lock.unlock();
  }
 }

 public Object take() throws InterruptedException {
  lock.lock();
  try {
   while (count == 0)
    notEmpty.await();
   Object x = items[head];
   if (++head == items.length) head = 0;
   --count;
   notFull.signal();
   return x;
  } finally {
   lock.unlock();
  }
 }
}

참고
- java api
- java concurrency in practice
Posted by 짱가쟁이
1. LockSample.java
- 인터럽트 요청이 올때까지 무식하게 자원만 축내는 넘..
package study.lock;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LockSample {

 final Lock lock = new ReentrantLock();
 
 public static Thread thread;
 
 public void lockTest() throws InterruptedException {
 
  if(!lock.tryLock(1, TimeUnit.SECONDS)) {
   System.out.println(Thread.currentThread().getName() + " : 누군가가 선점하고 있다.");
   return;
  }
 
  thread = Thread.currentThread();
 
  System.out.println(Thread.currentThread().getName() + " : 내가 선점하고 있다.");
 
  try {
     // 락을 인터럽트 시킬수 있다.
     lock.lockInterruptibly();
    
   try {
      // critical section
    Thread.sleep(6000);
     } finally {
      // 획 해제
      lock.unlock();
     }
   } catch (InterruptedException e) {
    // 인터럽트 됨.
  
    throw new InterruptedException(Thread.currentThread().getName() + " : 인터럽트 당함");
   }  finally {
    lock.unlock();
   }
 }
}

2. Task.java
- 무식하게 일만 시키는넘
package study.lock;


public class Task extends Thread {
 
 final LockSample sample;
 
 public Task(String name, final LockSample sample) {
  super(name);
  this.sample = sample;
 }
 
 public void run() {

  while(!isInterrupted())  {
   try {

    sample.lockTest();
   } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    return;
   }
  }
 }
}

3. 테스트
final LockSample sample = new LockSample();
 
Task task = new Task("task1", sample);
task.start();

Task task1 = new Task("task2", sample);
task1.start();

try {
 Thread.sleep(5000);
} catch (InterruptedException e) {
 // TODO Auto-generated catch block
 e.printStackTrace();
}

LockSample.thread.interrupt();

4. 결과
- 한놈이 계속 자원을 사용하고 있으면 다른 한놈은 다른 놈이 자원을 사용하고 있다고 울부짓는다. 중간에 인터럽트를 발생시키면 못먹겠다고 울부짖던 놈이 이번엔 지가 처먹는다. 굉장히 단순한넘인디..
task2 : 내가 선점하고 있다.
task1 : 누군가가 선점하고 있다.
task1 : 누군가가 선점하고 있다.
task1 : 누군가가 선점하고 있다.
task1 : 누군가가 선점하고 있다.
task1 : 내가 선점하고 있다.
java.lang.InterruptedException: task2 : 인터럽트 당함
task1 : 내가 선점하고 있다.
task1 : 내가 선점하고 있다.
task1 : 내가 선점하고 있다.
task1 : 내가 선점하고 있다.
task1 : 내가 선점하고 있다.

ps.
초기에는 단순히 여기 저기에 돌아다니는 예제를 참고로 작업을 하다보니 Lock 을 설정만 하고 해제를 제대로 못한 코드가 나온적이 있었다. 예를 들어 여러개의 스레드를 생성해서 테스트를 하다보면 lock을 선점하고 있던 스레드에 인터럽트를 보내서 스레드를 강제 종료 시켜도 다른 스레드에서 해당 lock 이 이미 선점되어 사용할 수 없다는 문제가 발생된다. lock을 설정해서 사용하는것도 중요하지만 해제를 꼭 해줘야 하는 부분이 synchronized 보다 많이 불편하다.

참고
http://blog.sdnkorea.com/blog/178?category=15
java concurrency in practice

Posted by 짱가쟁이
LOCKS(락)
java concurrency in practice 를 읽다보면 락 클래스가 어디에 사용되는지 자세히 설명해 놓았다. 본문을 참고로 왜 명시적인 락 클래스를 따로 만들었는지 요약해보겠다.
암묵적인 락 기능이 있는데 왜 명시적인 락 클래스를 따로 만들었을까? 암묵적인 락(synchronized)로 수용할 수 없는 문제(기능적으로 제한되는 경우)가 발생된다.

ex> 락을 확보하고자 대기하고 있는 쓰레드에 인터럽트를 걸거나, 대기 상태에 들어가지 않으면서 락을 확보하는 방법

성능
- 자바 5.0 에서는 ReentrantLock 은 암묵적인 락에 비해 훨씬 낳은 경쟁 성능(contended performance)을 보여준다.

자바 6 의 암묵적인 락은 개선되어 좀더 낳은 성능을 보임

- 블록 동기화 기능으로 synchronized 보다 진보된 능력을 제공한다. 락 타임아웃, 락당 복수의 조건 변수, 읽기/쓰기 락, 락 대기중인 스레드의 인터럽트 지원

- 전형적인 락 사용 패턴
 락 획득 -> 보호된 리소스에 엑세스 -> 락 해제
Lock lock = new ReentrantLock();
// 락 획득
lock.lock();
try {
 // critical section
} finally {
 // 획 해제
 lock.unlock();
}

- lockInterruptibly()
대기중인 스레드에 인터럽트할 수 있게 해준다. interrupt() 가 호출되면 대기 상태가 입터럽트되며 InterruptedException 이 던져진다.

락 획득이 실패할 경우에 unlock() 이 호출되면 안되기 때문에 try-finally, try-catch 로 나눠서 예외를 처리.
Lock lock = new ReentrantLock(); 

// 락을 인터럽트 시킬수 있다.
lock.lockInterruptibly();

try {
    // critical section
} finally {
    // 획 해제
    lock.unlock();
}

- tryLock()
 lock.tryLock()
  : 락이 가용 상태가 아닐 경우 즉시 실패 처림 됨.

 lock.tryLock(second, TimeUnit.SECOND)
  : 3초 동안 락 획득을 시도하고 락을 얻을 수 없으면 false를 반환하여 락을 얻을 수 없음을 알린다.
Lock lock = new ReentrantLock(); 

if(lock.tryLock()) return;

try {
    // critical section
} finally {
    // 획 해제
    lock.unlock();
}

- java.util.concurrent.locks 의 Lock 인터페이스
 1. ReentrantLock
  : 일반적으로 사용되는 락 (synchronized 랑 비슷한 유형일듯)
 
 2. ReentrantReadWriteLock 의 ReadLock , WriteLock
  : 읽고/쓰기 의 사용 비율에 따라 골라서 사용함
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
Lock readLock  = readWriteLock.readLock();
Lock writeLock = readWriteLock.writeLock();

ReentractLock 과 동일하게 사용.


Posted by 짱가쟁이
가끔 동일한 작업을 반복적으로 수행할 때 ExecutorService 를 사용하면 순차적으로 수행하는 것 보다 낳은 성능을 볼 수 있을 것임.

또한 계산 모듈이 종료되고 나온 결과값으로 다음 작업을 수행할 때 invokeAll() 은 유용할듯.

1. InvokeAllTest.java
package study.concurrency.invoke;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class InvokeAllTest {

 final ExecutorService executorService  = Executors.newFixedThreadPool(5);
 final List<WorkerTask> list    = new ArrayList<WorkerTask>();
 
 public void makeTaskList() {
  for(int i = 0;i<10;i++) {
   list.add(new WorkerTask());
  }
 }
 
 public void test() {
  try { 
   List<Future> futureList = executorService.invokeAll((Collection)list);
  
   try {
    for(Future future : futureList ) {
     System.out.println(future.get());
    }
   } catch (ExecutionException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
   executorService.shutdown();

  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }

  System.out.println("Completed");
 }
 
 public static void main(String[] args) {
  new Launcher().test();
 }
}

2. WorkerTask.java
package study.concurrency.invoke;

import java.util.concurrent.Callable;

public class WorkerTask implements Callable<String> {

 public String call() throws Exception {
  // TODO Auto-generated method stub
  for(int i = 0;i<5;i++) {
   System.out.println(Thread.currentThread().getName() + ":" + "running...");
   Thread.sleep(500);
  }
  return Thread.currentThread().getName() + ":" + "success";
 }
}

Posted by 짱가쟁이
몇년전쯤에 한 선배 왈..
"너는 기본이 부족한거 같다."

처음 그 소리를 들었을때 느껴지는 그 쓸쓸함이란..

요 몇일 java performance fundamental, java concurrency in practice 를 읽으면서 그 선배의 말이 뇌리를 울리는건 무엇일까? 반성하고 또 반성해야겠다.

항상 잊어버리는 누군가를 위해서.. 작업물을 올린다.

1. SimpleServer.java
- 간단한 서버소켓 샘플에 스레드풀만 추가한 단순한 모습.
package study.threadpool.server;

import java.io.IOException;


import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class SimpleServer {

 // 스레드 풀은 실행할 작업이 없어도 corePoolSize 를 유지한다.
 // 최초에 corePoolSize 만큼 스레드를 생성하지 않고 작업이 실행 될 때 corePoolSize 만큼 차례대로 생성함.
 // prestartAllCoreThreads() 를 사용하면 corePoolSize 만큼 스레드를 미리 생성함.
 final int corePoolSize   = 10;
 // xaximumPoolsize 는 동시에 얼마나 많은 개수의 스레드가 동작할 수 있는지를 제한하는 최대값.
 final int xaximumPoolsize  = 10;
 final int blocdingQueueSize = 100;
 final int port    = 10001;
 // keepAliveTime 는 스레드 유지 시간으로 스레드가 keepAliveTime 이상 대기하고 있으면
 // 해당 스레드는 제거 될 수 있음. 풀의 스레드 개수가 corePoolSize 를 넘어서면 제거될 수 있음.
 final long keepAliveTime = 30L;

 final ThreadPoolExecutor pool;
 final ServerSocket servrerSocket;
 
 final BlockingQueue<Runnable> queue;
 final PoolThreadFactory threadFactory;
 
 public SimpleServer() throws IOException {
  queue    = new ArrayBlockingQueue<Runnable>(blocdingQueueSize);
  threadFactory  = new PoolThreadFactory("SERVER_POOL");
  servrerSocket  = new ServerSocket(port);
  pool    = new ThreadPoolExecutor(corePoolSize,
             xaximumPoolsize,
             keepAliveTime,
             TimeUnit.SECONDS,
             queue,
             threadFactory,
             new ThreadPoolExecutor.CallerRunsPolicy());
 
 }
 
 public void serverStart() {
  try {

   while(true) {
    Socket clientSocket = servrerSocket.accept(); 
    pool.execute(new Work(clientSocket));
   } 
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }
 
 public static void main(String[] args) {
  try {
   new SimpleServer().serverStart();
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }
}

2. PoolThreadFactory.java
- ThreadFactory 를 상속받아 직접 제작한 스레드를 실행시킨다.
- 스레드 생성과 종료에 따른 로그를 기록하는 스레드를 사용하기 위해서(책 참조)
package study.threadpool.server;

import java.util.concurrent.ThreadFactory;

public class PoolThreadFactory implements ThreadFactory {

 private final String name;
 
 public PoolThreadFactory(String name) {
  this.name = name;
 }
 
 @Override
 public Thread newThread(final Runnable r) {
  final PoolAppThread poolAppThread = new PoolAppThread(r, name);
  PoolAppThread.setDebug(true);   
  return poolAppThread;
 }
}

3. PoolAppThread.java
- 스레드 생성, 종료에 따른 로그를 남기기 위해 제작된 단순한 예제 쓰레드
package study.threadpool.server;

import java.util.concurrent.atomic.AtomicInteger;

public class PoolAppThread extends Thread {

 public  static final String DEFAULT_POOL_NAME  = "PoolAppThread";
 private static volatile boolean debugLifecycle  = false;
 private static final AtomicInteger created   = new AtomicInteger();
 private static final AtomicInteger alive   = new AtomicInteger();
 
 public PoolAppThread(Runnable r) {
  this(r, DEFAULT_POOL_NAME); 
 }
 
 public PoolAppThread(Runnable runnable, String name) {
        super(runnable, name + "-" + created.incrementAndGet());
        setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            public void uncaughtException(Thread t,
                                          Throwable e) {
            
             System.out.println("UNCAUGHT in thread " + t.getName() + ":" + e);
            }
        });
    }
 
 public void run() {
        // Copy debug flag to ensure consistent value throughout.
        boolean debug = debugLifecycle;
        if (debug) {
         System.out.println("Created : " + getName());        
        }
        try {
            alive.incrementAndGet();
            super.run();
        } finally {
            alive.decrementAndGet();
            if (debug) {
             System.out.println("Exiting : " + getName());
            }
        }
    }
 
 public static int getThreadsCreated() {
        return created.get();
    }

    public static int getThreadsAlive() {
        return alive.get();
    }

    public static boolean getDebug() {
        return debugLifecycle;
    }

    public static void setDebug(boolean b) {
        debugLifecycle = b;
    }
}

4. Work.java
- 받은 데이터 보여주고 헛소리 전달만 하는 무식한넘.
package study.threadpool.server;

import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;


public class Work implements Runnable {

 
 Socket socket;
 
 public Work(Socket socket) {
  this.socket = socket;
 }
 
 @Override
 public void run() {
  // TODO Auto-generated method stub
  try {
   PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
   InputStreamReader in = new InputStreamReader(socket.getInputStream());
  
   char[] rBuf = new char[1000];
   int readMsg = in.read(rBuf);
  
   String msg = String.valueOf(rBuf).trim();
  
   System.out.println("read : [" + msg + "]");
  
   try {
    Thread.sleep(50);
   } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
  
   out.println("이거나 받으삼\n");
  
  } catch (IOException e) {
   // TODO Auto-generated catch block
   System.err.println("Oh, dear me! " + e);
  }
 }
}

성능을 높이기 위해서 여러가지 방법들이 있는거 같지만. 그때는 직접 하나하나 따려가면서 작업하는게 낳을듯.. 지금은 단순히 샘플만..

 
Posted by 짱가쟁이
ThreadPoolExecutor 는  beforeExecute, afterExecute, terminated 등의 훅 메소드를 제공한다.
ThreadPoolExecutor 클래스를 상속받은 TimingThreadPoolExecutor 를 사용하여 쓰레드의 동작 시간을 알 수 있다.

성능 테스트 시 유용하게 사용할 수 있을듯..(아니면 말구..)

TimingThreadPoolExecutor.java
package study.threadpool.timing;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;

public class TimingThreadPoolExecutor extends ThreadPoolExecutor {

    public TimingThreadPoolExecutor(int corePoolSize,
            int maximumPoolSize,
            long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue) {
  super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
 }

 private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
    private final AtomicLong numTasks = new AtomicLong();
    private final AtomicLong totalTime = new AtomicLong();

    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        startTime.set(System.nanoTime());
    }

    protected synchronized void afterExecute(Runnable r, Throwable t) {
        try {
            long endTime = System.nanoTime();
            long taskTime = endTime - startTime.get();
            numTasks.incrementAndGet();
            totalTime.addAndGet(taskTime);
        } finally {
            super.afterExecute(r, t);
        }
    }

    protected synchronized void terminated() {
        try {
         System.out.println("Terminated:");
         System.out.println(String.format(" total    time=%dns", totalTime.get()));
         System.out.println(String.format(" numTasks time=%d", numTasks.get()));        
            System.out.println(String.format(" avg time=%dns", (totalTime.get() / numTasks.get())));
                    
        } finally {
            super.terminated();
        }
    }
}

Work.java
- 하는일 없이 3초동안 잠만 자는 놈.
package study.threadpool.timing;

public class Work implements Runnable {

 @Override
 public void run() {
  // TODO Auto-generated method stub
  try {
   for(int i = 0;i<3;i++) {
    System.out.println("running...[" + i + "]");
    Thread.sleep(1000);
   }
  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }
}

Launcher.java
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Launcher {

 final ThreadPoolExecutor pool;
 final BlockingQueue<Runnable> queue;

 public Launcher() {
  queue    = new ArrayBlockingQueue<Runnable>(1);
  pool    = new TimingThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, queue);
 }
 
 public void test() {
  for(int i = 0;i<2;i++)
   pool.execute(new Work());
 
  pool.shutdown();
 }
 public static void main(String[] args) {
  new Launcher().test();
 }
}


결과
- 1.5 이상부터 System.nanoTime() 지원 됨.
running...[0]
running...[1]
running...[2]
running...[0]
running...[1]
running...[2]
Terminated:
 total    time=5999990250ns
 numTasks time=2
 avg time=2999995125ns

Posted by 짱가쟁이
정상적으로 종료되는 JVM 은 가장 먼저 종료 훅을 실행 시킨다.
Runtime.getRuntime().addShutdownHook(..) 에 등록된 실행되지 않은 스레드를 의미함.

Example
package study.shutdownhook;

public class Launcher {

 public static void main(String[] args) {
  Runtime.getRuntime().addShutdownHook(new Thread() {
   public void run() {
    System.out.println("종료훅 실행..");
   }
  });
 
  ShutdownHookThread t = new ShutdownHookThread();
  t.start();
 
 
  try {
   Thread.sleep(5000);
  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  t.interrupt();
 }
}

class ShutdownHookThread extends Thread {
 public void run() {
  try {
   while(!Thread.currentThread().isInterrupted()) {
    System.out.println("running...");
  
    Thread.sleep(1000);
   }
  } catch (InterruptedException ignored) {}
 }
}

Posted by 짱가쟁이
인터럽트에 응답하니께. 작업을 할 수 있는데. 응답하지 못하는 놈들은?
뭐.. 우선은 Future 를 이용해서 작업을 중단해 보더라구..

1. Launcher.java
- newSingleThreadExecutor() 의 Executor 를 생성해서 사용함.
- Timeout Exception 이 발생하면 해당 잡업을 종료하는 무식한 넘임.
package study.interrupt.future;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Launcher {

 private static final ExecutorService exce = Executors.newSingleThreadExecutor();
 
 public static void main(String[] args) {
  BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
 
  Future<?> task = exce.submit(new Task(queue));
  
  try {
   task.get(3, TimeUnit.SECONDS);
  } catch (InterruptedException e) {
   e.printStackTrace();
  } catch (ExecutionException e) {
   e.printStackTrace();
  } catch (TimeoutException e) {
   e.printStackTrace();
   task.cancel(true);
  }

  if(task.isCancelled()) {
   System.out.println("종료됨");
  
  }
  for(String str : queue) {
   System.out.print(str + " : " );
  }

 }
}

2. Task.java
- 별루 하는 일 없이 처먹다가 배부르면 안먹는다고 뻐팅기는 지극히 현실적이면서.. 무식한놈.
package study.interrupt.future;

import java.util.concurrent.BlockingQueue;

public class Task implements Runnable {

 private final BlockingQueue<String> queue;
 
 Task(BlockingQueue<String> queue) {
  this.queue = queue;
 }
 
 @Override
 public void run() {
  try {
  
   int index = 0;
  
   while(!Thread.currentThread().isInterrupted()) {
    queue.put("" + index);
    System.out.print(index + " : ");
    index ++;
  
    Thread.sleep(50);
   }
  } catch(InterruptedException e) {
   e.printStackTrace();
  }
 }
}

Posted by 짱가쟁이
wait 상태의 Thread 를 종료하고 싶을 때.. interrupt 를 사용하자.
뭐.. 스레드를 종료할 때 항상 interrupt 를 사용해서 종료하면 좋다고는 함.

1. SampleThread.java
- interrupt 를 사용해서 thread 를 종료한다.
package study.interrupt;

import java.util.concurrent.BlockingQueue;

public class SampleThread extends Thread {

 private final BlockingQueue<String> queue;
 
 SampleThread(BlockingQueue<String> queue) {
  this.queue = queue;
 }
 
 @Override
 public void run() {
  try {
  
   int index = 0;
  
   while(!Thread.currentThread().isInterrupted()) {
    queue.put("" + index);
    System.out.print(index + " : ");
    index ++;
   }
  } catch(InterruptedException e) {
   System.out.println();
  }
 }
 
 public void cancel() {
  interrupt();
 }

}

2. 테스트
package study.interrupt;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class Launcher {

 public static void main(String[] args) {
  BlockingQueue<String> queue = new ArrayBlockingQueue(10);
 
  SampleThread t = new SampleThread(queue);
  t.start();
 
  try {
   Thread.sleep(3000);
  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  t.cancel();

  for(String str : queue) {
   System.out.print(str + " : " );
  }

 }
}

Posted by 짱가쟁이
이번 예제도 "Excutor를 사용한 thread pool 샘플" 을 토대로 작성됨
Executor 를 상속받은 ExecutorService 를 사용하여 pool 의 동작 주기를 관리한다.
void stop(..) 함수는 java api 를 참조함..

1. ConsumerThread.java 수정

package study.concurrrency;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

public class ConsumerThread implements Runnable {

 Thread thread;

 private static final ExecutorService pool = Executors.newFixedThreadPool(10);
 
 public void start() {
  thread = new Thread(this);
  thread.start();
 }
 
 // java api 참조
 void stop(ExecutorService pool) {
  pool.shutdown(); // Disable new tasks from being submitted
 
  try {
   // Wait a while for existing tasks to terminate
   if (! pool.awaitTermination(60, TimeUnit.SECONDS)) {
    pool.shutdownNow(); // Cancel currently executing tasks
    // Wait a while for tasks to respond to being cancelled
    if (! pool.awaitTermination(60, TimeUnit.SECONDS))
     System.err.println("Pool did not terminate");
   }
  } catch (InterruptedException ie) {
   // (Re-) Cancel if current thread also interrupted
   pool.shutdownNow();
   // Preserve interrupt status
   Thread.currentThread().interrupt();
  }
 }
 
 @Override
 public void run() {
  // TODO Auto-generated method stub 
  while(!pool.isShutdown()) { 
   try {
    pool.execute(new Worker(BlockingQueueSample.getInstance().getData()));
   } catch(RejectedExecutionException e) {
    if(!pool.isShutdown()) {
     System.out.println("작업이 거부당했구만..");
    }
   }
  }
 }
}




 

 

Posted by 짱가쟁이