2011年10月5日水曜日

仕様変更に伴うTwitterStreamingReaderの変更

どうやら,Twitterで仕様変更があったらしく,
StreamingAPIを使ったデータの読み込みを変更しなければいけなくなりました.

具体的には,TwitterのDeveloper用ブログにあるとおり,
All our Streaming API products are now supporting SSL and we've just updated the Streaming API Methods, User Streams and Site Streams documentation pages accordingly. As we're planning to sunset HTTP support in about a month, we strongly encourage you to switch to SSL (HTTPS) as soon as possible, especially if you're still authenticating your Streaming API requests with Basic Auth.
ということで,SSLを使ってね,ということらしいです.

そんなわけで,以前作成したTwitterStreamReaderではなく,新しいバージョンを使ってください.

変更点は,SSLに対応させただけだけど.
得られるTweetデータはJSON形式なので,jsonicなどを使ってJSONの処理をちゃんとしないと日本語が文字化けするようです.
また,日本語のTweetだけ取り出したい人は,文字列に日本語が含まれるかどうかの判定をご参照ください.

package twitter;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.net.Authenticator;
import java.net.HttpURLConnection;
import java.net.PasswordAuthentication;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
 * StreamAPIから送られてくるStreamデータをひたすら取得し続けるクラス
 * @author tori
 *
 */
public class TwitterStreamReader {


 /**
  * 使い方の例
  * @param args
  * @throws UnsupportedEncodingException 
  */
 public static void main(String[] args) throws UnsupportedEncodingException {
  
  TwitterStreamReader tsr = new TwitterStreamReader("Twitter-UserName", "Password");
  tsr.start();
  
  while(true){
   List<String> tweetList = tsr.readStoredList();
   List<String> errList = tsr.readErrLogList();
   
   for(String text:tweetList){
    System.out.println(text);
   }
   
   for(String err:errList){
    System.err.println(err);
   }
  }
 }
 
 /**
  * sampleのタイムライン取得のためのURL
  */
 private static final String DEFAULT_API_URL = "https://stream.twitter.com/1/statuses/sample.json";

 String userName;
 String password;

 /**
  * 強制終了させられたかどうか
  */
 boolean isForceStop;

 /**
  * 取得しているかどうか
  */
 boolean isRunning;
 
 /**
  * Synchlonizedされたリスト
  */
 List<String> dataList;

 /**
  * Synchlonizedされたリスト
  */
 List<String> errLogList;

 /**
  * apiのURL
  */
 String apiUrl = DEFAULT_API_URL;

 /**
  * ストリームを読み続けるためのスレッド
  */
 private Thread streamReadThread;
 
 public TwitterStreamReader(String userName, String password) {
  super();
  this.userName = userName;
  this.password = password;

  dataList = Collections.synchronizedList(new ArrayList<String>());
  errLogList = Collections.synchronizedList(new ArrayList<String>());
  
 }

 /**
  * 読み込みを開始する
  */
 public void start(){
  StreamReader streamReader = new StreamReader();
  
  streamReadThread = new Thread(streamReader);
  streamReadThread.start();
 }

 /**
  * 取得を停止する
*/
 public void stop() {
  this.isForceStop = true;
 }
 
 /**
  * データ取得中かどうかを返す
  * @return
  */
 public boolean isRunning(){
  return streamReadThread.isAlive();
 }
 
 /**
  * 取得済みのTweetデータのリストを取得する
* これまでに読み込んだデータは削除される
  * @return 是までに読み込んだデータ
  */
 public List<String> readStoredList(){
  List<String> storedList;
  synchronized (dataList) {
   storedList = new ArrayList<String>(dataList);
   dataList.clear();
  }

  return storedList;
 }
 
 /**
  * エラーログのリストを取得する
* これまでに読み込んだデータは削除される
  * @return 是までに読み込んだデータ
  */
 public List<String> readErrLogList(){
  List<String> storedList;
  synchronized (errLogList) {
   storedList = new ArrayList<String>(errLogList);
   errLogList.clear();
  }
  return storedList;
 }

 /**
  * 強制停止信号を送ったかどうか
  * @return the isForceStop
  */
 public boolean isForceStop() {
  return isForceStop;
 }
 
 /**
  * データ読み込み用クラス
  * @author tori
  *
  */
 class StreamReader implements Runnable{
  @Override
  public void run() {
   isForceStop = false;
   while(!isForceStop){
    InputStreamReader isr = null;
    BufferedReader br = null;
    try{
     URL connectUrl = new URL(apiUrl);
     HttpURLConnection con = (HttpURLConnection)connectUrl.openConnection();
     
     con.setRequestMethod("GET");
     con.setDoOutput(true);
     con.setInstanceFollowRedirects(true); 

     Authenticator auth = new Authenticator(){
      public PasswordAuthentication getPasswordAuthentication(){
       return new PasswordAuthentication(userName, password.toCharArray());
      }
     };
     Authenticator.setDefault(auth);
     

     isr = new InputStreamReader(con.getInputStream(), "UTF8");
     br = new BufferedReader(isr);
     while(!isForceStop){
      String tweet = br.readLine();
      synchronized (dataList) {
       dataList.add(tweet);
      }
     }
    }catch(Exception e){
     StringBuffer buf = new StringBuffer();
     buf.append(e.getClass().getName()+"\n");
     for(StackTraceElement ste:e.getStackTrace()){
      buf.append(String.format("\tat %s.%s(%s:%d)\n", ste.getClassName(), ste.getMethodName(), ste.getFileName(), ste.getLineNumber()));
     }
     synchronized (errLogList) {
      errLogList.add(buf.toString());
     }
    }finally{
     try{
      br.close();
     }catch(Exception e){
     }
     try{
      isr.close();
     }catch(Exception e){
     }
    }
   }
  }
 }
}