DotNet Reactive Extension 学习

http://blogs.msdn.com/b/rxteam/
http://www.codeproject.com/KB/Parallel_Programming/RxByExample.aspx
http://msdn.microsoft.com/en-us/data/gg577609
http://stackoverflow.com/questions/1969036/reactive-extensions-rx-and-asynchronous-class

Using dotnet4.0 needs some modifications !! Follow me.

1,Observer模式

http://msdn.microsoft.com/en-us/library/dd990377(VS.100).aspx

自己实现IObserver和IObserable
其中IObserable中private list<T>,在Subscribe()调用list.Add(xx).
在IDispose的Dispose()中调用list.Remove(xx).
在event发生时调用list.OnNext(xx).
   public void TrackLocation(Nullable<Location> loc)
   {
      foreach (var observer in observers) {
         if (! loc.HasValue)
            observer.OnError(new LocationUnknownException());
         else
            observer.OnNext(loc.Value);
      }
   }

在IObserver观察者class中,实现OnComplete, OnError, OnNext

2, 使用Rx, example1
DotNet3.5 add reference system.reactive; system.coreex, system.threading.
DotNet4.0 add system.reactive;

注意Observable这个静态类,在3.5中属于System.Linq.Observable,
在4.0时候属于System.Reactive.Linq.Observable,非常之不一样。
可使用ILSpy观察这两个版本的system.reactive.dll
另外4.0版本的reactive没有forkjoin,貌似可以直接使用join().

	////////////////////////////
	var result = GetAllDirectories(@"c:\aaa\");
	var observableResult = result.ToObservable();
	observableResult.Subscribe((line)=>
		{
		Console.WriteLine(line);
	});

	////////////////////////////
	static IEnumerable<string> GetAllDirectories(string path)
	{
		string[] subdirs = null;
		subdirs = Directory.GetDirectories(path);
		if (subdirs != null)
		{
			foreach (var subdir in subdirs)
			{
				yield return subdir;
				foreach (var grandchild in GetAllDirectories(subdir))
				{
					yield return grandchild;
				}
			}
		}
	}

///////////////////////////
在ConsoleOutputter中,间隔一段时间向stdout写内容。
在下面代码将process重定向到StandardOutput,然后通过GetLineReader得到IEnumerable接口。

	var info = new ProcessStartInfo("./ConsoleOutputter.exe");
    info.RedirectStandardError = true;
    info.RedirectStandardOutput = true;
    info.UseShellExecute = false;

    var process = Process.Start(info);
	var childStdOut = GetLineReader(process.StandardOutput).ToObservable();

	///////////////////////////////////////
    private static IEnumerable<string> GetLineReader(StreamReader reader)
    {
      while (reader.BaseStream.CanRead)
      {
        var l = reader.ReadLine();
        if (l == null)
        {
          break;
        }
        yield return l;
      }
    }

3, 使用Rx, example2
其中bufferedDirectories订阅输出函数的参数为IEnumerable<string>

	IObservable<string> directories;
	IObservable<IEnumerable<string>> bufferedDirectories;
	IDisposable observer;
	////// 在构造函数中,创建两个IEnumerable成员。
	public DirectoriesForm()
	{
	  directories = System.Reactive.Linq.Observable.ToObservable(GetAllDirectories(@"c:\ov\source\B2bAuthentication Service\"));
	  bufferedDirectories = System.Reactive.Linq.Observable.Buffer<string>(directories, TimeSpan.FromSeconds(1));

	  ......
	}

	private void butStop_Click(object sender, EventArgs e)
    {
      if (this.observer != null)
      {
        this.observer.Dispose();
        this.observer = null;
        this.butStop.Enabled = false;
        this.butObserverSingle.Enabled = true;
        this.butObserveBuffered.Enabled = true;
      }
    }

    private void butObserverSingle_Click(object sender, EventArgs e)
    {
      this.treeViewDirectories.Nodes.Clear();
      if (this.observer == null)
      {	    
		// 建立订阅关系 !!!!!!!!!!!!!!!!!

        this.observer = this.directories.Subscribe(outputDirectory);
        this.butStop.Enabled = true;
        this.butObserverSingle.Enabled = false;
        this.butObserveBuffered.Enabled = false;
      }
    }	

	private void butObserveBuffered_Click(object sender, EventArgs e)
    {
      this.treeViewDirectories.Nodes.Clear();
      if (this.observer == null)
      {
        // 建立订阅关系 !!!!!!!!!!!!!!!!!

        this.observer = this.bufferedDirectories.Subscribe(outputDirectories);
        this.butStop.Enabled = true;
        this.butObserverSingle.Enabled = false;
        this.butObserveBuffered.Enabled = false;
      }
    }

    private void outputDirectory(string path)
    {
      // We check to see if the handle is created because when 
      // the form is disposing this may still be trying to observe.
      if (this.treeViewDirectories.IsHandleCreated)
      {
        this.treeViewDirectories.Nodes.Add(path);
      }
    }

    private void outputDirectories(IEnumerable<string> paths)
    {
      // We check to see if the handle is created because when 
      // the form is disposing this may still be trying to observe.
      if (this.treeViewDirectories.IsHandleCreated)
      {
        try
        {
          this.treeViewDirectories.BeginUpdate();
          foreach (var path in paths)
          {
            this.treeViewDirectories.Nodes.Add(path);
          }
        }
        finally
        {
          this.treeViewDirectories.EndUpdate();
        }
      }
    }

 https://gist.github.com/973970

GAE同步twitter消息到新浪微博

google appengine(python)是一个极帅极酷的平台,通过它,你可以把很多任务非常简单的实现,而且有很多现成的工具、代码、库可以使用。

我一直想做一个twitter到新浪微博的同步程序,多少也是为了活跃一下人气。费了不少力气终于搞定了。

 

如果你想复制拷贝我的成功,那太容易了,听我道来。

首先你需要有一个google appengine账号,在上面部署@livid开发的v2ex程序(v2ex.com)。我自己使用的是hack过的版本,对于twitter功能大大增强。如果你只是为了转发,那么不需要修改很多。只要抓取最新的v2ex就行了。https://github.com/livid/v2ex/zipball/master 

另外你要有twitter开发者api,否则无法搭建自己的程序。

部署成功以后,一般国内你还需要搞个vps,装个nginx,然后实现gae的反向代理,否则国内是无法访问的。这点比较敏感,大家自行搜索研究吧。如果没有vps,那你就必须会翻墙访问你自己这个服务。另外还要说明的是,v2ex绑定twitter也需要能访问twitter.com,然后就不需要了。

再到这里https://github.com/PinkyJie/Tui2Lang下载代码,主要是借用它的代码实现。

你还需要有个新浪微博,另外需要申请新浪微博的开发者身份,申请建立一个新浪微博的应用。

 

是不是感觉有些麻烦,其实说着难,做着也还好,我这些账号在这些服务开放的时候都申请了,所以没觉得怎样。

 

在v2ex代码中有个t.py,就是负责twitter工作,编辑它。

搜索“twitter.PostUpdate”,应该会找到TwitterTweetHandler这个类,然后加入类似下面的代码:

auth = OAuthHandler(‘……’, ‘……’)
auth.setToken(‘……’, ‘……’)
api = API(auth)
api.update_status(status.encode(‘utf-8’))

如果你还没有拿到新浪微博应用的token,还需要额外加入验证,这个基本上抄袭v2ex中的twitter验证部分就可以了。

 

在开头部分需要加入

sys.path.insert(0, ‘weibopy.zip’)
sys.path.append(‘weibopy.zip/weibopy’)

from weibopy.auth import OAuthHandler
from weibopy.api import API
from weibopy.error import WeibopError

这就可以了。

 

我试过其它几种方法,都不是很好用,但是可以列出了让大家看看。

第一种使用了gaemechanize这个开源库,需要在新浪SAE上建立一个简单的post form。可以用,但可用性极差。

br = mechanize.Browser()
br.open(“http://xxx.sinaapp.com/weibopost.php”)
br.select_form(nr=0)
str = status.encode(‘utf-8’)
br.form[‘text’] = str
br.submit()

另外想尝试通过xmlrpc访问SAE上的rpcserver,这个没成功。另外,如果想建立php的rpcserver client,从wordpress摘出来的最方便。

rpc_server = xmlrpclib.Server(‘http://xxx.sinaapp.com/rpcserver1.php’)
result = rpc_server.NewPost(status.encode(‘utf-8’))

 

另外尝试了https://github.com/lyxint/wet的代码,也是不成。
send_sina_msgs(‘aa@bb.com’, ‘password’, status.encode(‘utf-8’))

下一步打算同步到腾讯微博,这个基本上与新浪一样一样地,就不多写了。