DotNet Reactive Extension 学习


http://blogs.msdn.com/b/rxteam/
http://www.codeproject.com/KB/Parallel_Programming/RxByExample.aspx
http://msdn.microsoft.com/en-us/data/gg577609
http://stackoverflow.com/questions/1969036/reactive-extensions-rx-and-asynchronous-class

Using dotnet4.0 needs some modifications !! Follow me.

1,Observer模式

http://msdn.microsoft.com/en-us/library/dd990377(VS.100).aspx

自己实现IObserver和IObserable
其中IObserable中private list<T>,在Subscribe()调用list.Add(xx).
在IDispose的Dispose()中调用list.Remove(xx).
在event发生时调用list.OnNext(xx).
   public void TrackLocation(Nullable<Location> loc)
   {
      foreach (var observer in observers) {
         if (! loc.HasValue)
            observer.OnError(new LocationUnknownException());
         else
            observer.OnNext(loc.Value);
      }
   }

在IObserver观察者class中,实现OnComplete, OnError, OnNext

2, 使用Rx, example1
DotNet3.5 add reference system.reactive; system.coreex, system.threading.
DotNet4.0 add system.reactive;

注意Observable这个静态类,在3.5中属于System.Linq.Observable,
在4.0时候属于System.Reactive.Linq.Observable,非常之不一样。
可使用ILSpy观察这两个版本的system.reactive.dll
另外4.0版本的reactive没有forkjoin,貌似可以直接使用join().

	////////////////////////////
	var result = GetAllDirectories(@"c:\aaa\");
	var observableResult = result.ToObservable();
	observableResult.Subscribe((line)=>
		{
		Console.WriteLine(line);
	});

	////////////////////////////
	static IEnumerable<string> GetAllDirectories(string path)
	{
		string[] subdirs = null;
		subdirs = Directory.GetDirectories(path);
		if (subdirs != null)
		{
			foreach (var subdir in subdirs)
			{
				yield return subdir;
				foreach (var grandchild in GetAllDirectories(subdir))
				{
					yield return grandchild;
				}
			}
		}
	}

///////////////////////////
在ConsoleOutputter中,间隔一段时间向stdout写内容。
在下面代码将process重定向到StandardOutput,然后通过GetLineReader得到IEnumerable接口。

	var info = new ProcessStartInfo("./ConsoleOutputter.exe");
    info.RedirectStandardError = true;
    info.RedirectStandardOutput = true;
    info.UseShellExecute = false;

    var process = Process.Start(info);
	var childStdOut = GetLineReader(process.StandardOutput).ToObservable();

	///////////////////////////////////////
    private static IEnumerable<string> GetLineReader(StreamReader reader)
    {
      while (reader.BaseStream.CanRead)
      {
        var l = reader.ReadLine();
        if (l == null)
        {
          break;
        }
        yield return l;
      }
    }

3, 使用Rx, example2
其中bufferedDirectories订阅输出函数的参数为IEnumerable<string>

	IObservable<string> directories;
	IObservable<IEnumerable<string>> bufferedDirectories;
	IDisposable observer;
	////// 在构造函数中,创建两个IEnumerable成员。
	public DirectoriesForm()
	{
	  directories = System.Reactive.Linq.Observable.ToObservable(GetAllDirectories(@"c:\ov\source\B2bAuthentication Service\"));
	  bufferedDirectories = System.Reactive.Linq.Observable.Buffer<string>(directories, TimeSpan.FromSeconds(1));

	  ......
	}

	private void butStop_Click(object sender, EventArgs e)
    {
      if (this.observer != null)
      {
        this.observer.Dispose();
        this.observer = null;
        this.butStop.Enabled = false;
        this.butObserverSingle.Enabled = true;
        this.butObserveBuffered.Enabled = true;
      }
    }

    private void butObserverSingle_Click(object sender, EventArgs e)
    {
      this.treeViewDirectories.Nodes.Clear();
      if (this.observer == null)
      {	    
		// 建立订阅关系 !!!!!!!!!!!!!!!!!

        this.observer = this.directories.Subscribe(outputDirectory);
        this.butStop.Enabled = true;
        this.butObserverSingle.Enabled = false;
        this.butObserveBuffered.Enabled = false;
      }
    }	

	private void butObserveBuffered_Click(object sender, EventArgs e)
    {
      this.treeViewDirectories.Nodes.Clear();
      if (this.observer == null)
      {
        // 建立订阅关系 !!!!!!!!!!!!!!!!!

        this.observer = this.bufferedDirectories.Subscribe(outputDirectories);
        this.butStop.Enabled = true;
        this.butObserverSingle.Enabled = false;
        this.butObserveBuffered.Enabled = false;
      }
    }

    private void outputDirectory(string path)
    {
      // We check to see if the handle is created because when 
      // the form is disposing this may still be trying to observe.
      if (this.treeViewDirectories.IsHandleCreated)
      {
        this.treeViewDirectories.Nodes.Add(path);
      }
    }

    private void outputDirectories(IEnumerable<string> paths)
    {
      // We check to see if the handle is created because when 
      // the form is disposing this may still be trying to observe.
      if (this.treeViewDirectories.IsHandleCreated)
      {
        try
        {
          this.treeViewDirectories.BeginUpdate();
          foreach (var path in paths)
          {
            this.treeViewDirectories.Nodes.Add(path);
          }
        }
        finally
        {
          this.treeViewDirectories.EndUpdate();
        }
      }
    }

 https://gist.github.com/973970

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注