http://blogs.msdn.com/b/rxteam/ http://www.codeproject.com/KB/Parallel_Programming/RxByExample.aspx http://msdn.microsoft.com/en-us/data/gg577609 http://stackoverflow.com/questions/1969036/reactive-extensions-rx-and-asynchronous-class Using dotnet4.0 needs some modifications !! Follow me. 1,Observer模式 http://msdn.microsoft.com/en-us/library/dd990377(VS.100).aspx 自己实现IObserver和IObserable 其中IObserable中private list<T>,在Subscribe()调用list.Add(xx). 在IDispose的Dispose()中调用list.Remove(xx). 在event发生时调用list.OnNext(xx). public void TrackLocation(Nullable<Location> loc) { foreach (var observer in observers) { if (! loc.HasValue) observer.OnError(new LocationUnknownException()); else observer.OnNext(loc.Value); } } 在IObserver观察者class中,实现OnComplete, OnError, OnNext 2, 使用Rx, example1 DotNet3.5 add reference system.reactive; system.coreex, system.threading. DotNet4.0 add system.reactive; 注意Observable这个静态类,在3.5中属于System.Linq.Observable, 在4.0时候属于System.Reactive.Linq.Observable,非常之不一样。 可使用ILSpy观察这两个版本的system.reactive.dll 另外4.0版本的reactive没有forkjoin,貌似可以直接使用join(). //////////////////////////// var result = GetAllDirectories(@"c:\aaa\"); var observableResult = result.ToObservable(); observableResult.Subscribe((line)=> { Console.WriteLine(line); }); //////////////////////////// static IEnumerable<string> GetAllDirectories(string path) { string[] subdirs = null; subdirs = Directory.GetDirectories(path); if (subdirs != null) { foreach (var subdir in subdirs) { yield return subdir; foreach (var grandchild in GetAllDirectories(subdir)) { yield return grandchild; } } } } /////////////////////////// 在ConsoleOutputter中,间隔一段时间向stdout写内容。 在下面代码将process重定向到StandardOutput,然后通过GetLineReader得到IEnumerable接口。 var info = new ProcessStartInfo("./ConsoleOutputter.exe"); info.RedirectStandardError = true; info.RedirectStandardOutput = true; info.UseShellExecute = false; var process = Process.Start(info); var childStdOut = GetLineReader(process.StandardOutput).ToObservable(); /////////////////////////////////////// private static IEnumerable<string> GetLineReader(StreamReader reader) { while (reader.BaseStream.CanRead) { var l = reader.ReadLine(); if (l == null) { break; } yield return l; } } 3, 使用Rx, example2 其中bufferedDirectories订阅输出函数的参数为IEnumerable<string> IObservable<string> directories; IObservable<IEnumerable<string>> bufferedDirectories; IDisposable observer; ////// 在构造函数中,创建两个IEnumerable成员。 public DirectoriesForm() { directories = System.Reactive.Linq.Observable.ToObservable(GetAllDirectories(@"c:\ov\source\B2bAuthentication Service\")); bufferedDirectories = System.Reactive.Linq.Observable.Buffer<string>(directories, TimeSpan.FromSeconds(1)); ...... } private void butStop_Click(object sender, EventArgs e) { if (this.observer != null) { this.observer.Dispose(); this.observer = null; this.butStop.Enabled = false; this.butObserverSingle.Enabled = true; this.butObserveBuffered.Enabled = true; } } private void butObserverSingle_Click(object sender, EventArgs e) { this.treeViewDirectories.Nodes.Clear(); if (this.observer == null) { // 建立订阅关系 !!!!!!!!!!!!!!!!! this.observer = this.directories.Subscribe(outputDirectory); this.butStop.Enabled = true; this.butObserverSingle.Enabled = false; this.butObserveBuffered.Enabled = false; } } private void butObserveBuffered_Click(object sender, EventArgs e) { this.treeViewDirectories.Nodes.Clear(); if (this.observer == null) { // 建立订阅关系 !!!!!!!!!!!!!!!!! this.observer = this.bufferedDirectories.Subscribe(outputDirectories); this.butStop.Enabled = true; this.butObserverSingle.Enabled = false; this.butObserveBuffered.Enabled = false; } } private void outputDirectory(string path) { // We check to see if the handle is created because when // the form is disposing this may still be trying to observe. if (this.treeViewDirectories.IsHandleCreated) { this.treeViewDirectories.Nodes.Add(path); } } private void outputDirectories(IEnumerable<string> paths) { // We check to see if the handle is created because when // the form is disposing this may still be trying to observe. if (this.treeViewDirectories.IsHandleCreated) { try { this.treeViewDirectories.BeginUpdate(); foreach (var path in paths) { this.treeViewDirectories.Nodes.Add(path); } } finally { this.treeViewDirectories.EndUpdate(); } } } https://gist.github.com/973970