Documentation/Tutorials/RTMPClient

RTMP Client

New Changes have been made to the RTMPClient in 0.8 which allows for more flexibility.

RTMPClient can be either extended or composed into another class. Both ways need to implement some callback interfaces for handling callback methods for stream events and exception handling

Extending RTMPClient

import org.red5.server.api.service.IPendingServiceCallback;
import org.red5.server.net.rtmp.ClientExceptionHandler;
import org.red5.server.net.rtmp.INetStreamEventHandler;
import org.red5.server.api.event.IEventDispatcher; 

public class RTMPClientExtended extends RTMPClient implements INetStreamEventHandler, IPendingServiceCallback, ClientExceptionHandler, IEventDispatcher, ISharedObjectListener, IPushableConsumer, IPipeConnectionListener
{
....
}

RTMPClient Service Callback Handlers

The handlers need to be setup to a class like so

rtmpClient.setServiceProvider(this);
rtmpClient.setExceptionHandler(this);
rtmpClient.setCallbackProvider(this);
rtmpClient.setStreamEventDispatcher(this);

Callback Result Handlers

Callback Result Handler

The resultReceived method determines connections status, it works similar to the Flash status event

synchronized public void resultReceived(IPendingServiceCall call) { 
      Object result = call.getResult();
                
                if (result instanceof ObjectMap)
                {
                        
                        if (StatusCodes.NC_CONNECT_SUCCESS.equals(code))
                        {
                                log.info("Connected");
                                state = RTMPStreamState.STREAM_CREATING;
                                createStream(this);
                                
                        }
               }
.....
                else {
                
                  if ("createStream".equals(call.getServiceMethodName())) {
                        
                        state = RTMPStreamState.PLAYING;
        
                        if (result instanceof Integer) {
                                Integer streamIdInt = (Integer) result;
                                streamId = streamIdInt.intValue();
                                log.debug("createStream result stream id: " + streamId); 
                                play(streamIdInt, fileName, start, duration);
                        } else {
                                disconnect();
                                state = RTMPStreamState.STOPPED;
                        }
                        
                  }
        }
}

Callback Stream Status Handler

Where createStream(this); sets up the NetStream for playing back and streaming. The next call will trap creatStream and then be able to play back the file by first collecting the streamId.

The stream event status is setup like so

public void onStatus(Object obj)
{
   ObjectMap map = (ObjectMap) obj;
   String code = (String) map.get("code");
   String description = (String) map.get("description");
   String details = (String) map.get("details");
                
   if (StatusCodes.NS_PLAY_START.equals(code))
   {
      .......

   }
}

Callback Error Handler

The client exception handler for handling the connection errors is like so

public void handleException(Throwable throwable)
{
                log.error("{}",new Object[]{throwable.getCause()});
}

Dispatch Event Handler

For handling the NetStream events during streaming playback which lets you trap the packets of the stream it is like so

public void dispatchEvent(IEvent event) {  
        if (!(event instanceof IRTMPEvent)) {  
            log.debug("skipping non rtmp event: " + event);  
            return;  
        }  
        IRTMPEvent rtmpEvent = (IRTMPEvent) event;  
        /*
        if (log.isDebugEnabled()) {  
            log.debug("rtmp event: " + rtmpEvent.getHeader() + ", "  
                    + rtmpEvent.getClass().getSimpleName());  
        } */
        if (!(rtmpEvent instanceof IStreamData)) {  
            log.debug("skipping non stream data");  
            return;  
        }  
        if (rtmpEvent.getHeader().getSize() == 0) {  
            log.debug("skipping event where size == 0");  
            return;  
        }  
        ITag tag = new Tag();  
        tag.setDataType(rtmpEvent.getDataType());  
        if (rtmpEvent instanceof VideoData) {  
            videoTs += rtmpEvent.getTimestamp();  
            tag.setTimestamp(videoTs);  
        } else if (rtmpEvent instanceof AudioData) {  
            audioTs += rtmpEvent.getTimestamp();  
            tag.setTimestamp(audioTs);  
        }  
        
        ByteBuffer data = ((IStreamData) rtmpEvent).getData().asReadOnlyBuffer();  
        tag.setBodySize(data.limit());  
        tag.setBody(data);  
        //log.debug(data.toString());
        try {  
            writer.writeTag(tag);  
        } catch (Exception e) {  
            throw new RuntimeException(e);  
        }  
 }  

The connect timeout is still hardcoded in RTMPClient unless the entire code is redone in a extended class for the startConnector method. More specifically the CONNECTOR_WORKER_TIMEOUT constant. RTMPClient needs a setter method enabled to set the timeout dynamically.

Examples

Playback Example

synchronized public void resultReceived(IPendingServiceCall call) { 
      Object result = call.getResult();
                
                if (result instanceof ObjectMap)
                {
                        
                        if (StatusCodes.NC_CONNECT_SUCCESS.equals(code))
                        {
                                log.info("Connected");
                                state = RTMPStreamState.STREAM_CREATING;
                                createStream(this);
                                
                        }
               }
.....
                else {
                
                  if ("createStream".equals(call.getServiceMethodName())) {
                        
                        state = RTMPStreamState.PLAYING;
        
                        if (result instanceof Integer) {
                                Integer streamIdInt = (Integer) result;
                                streamId = streamIdInt.intValue();
                                log.debug("createStream result stream id: " + streamId); 
                               play(streamId, "stream.flv", 0); 
                        } else {
                                disconnect();
                                state = RTMPStreamState.STOPPED;
                        }
                        
                  }
        }
}

Publish Example

synchronized public void resultReceived(IPendingServiceCall call) { 
      Object result = call.getResult();
                
                if (result instanceof ObjectMap)
                {
                        
                        if (StatusCodes.NC_CONNECT_SUCCESS.equals(code))
                        {
                                log.info("Connected");
                                state = RTMPStreamState.STREAM_CREATING;
                                createStream(this);
                                
                        }
               }
.....
                else {
                
                  if ("createStream".equals(call.getServiceMethodName())) {
                        
                        state = RTMPStreamState.PLAYING;
        
                        if (result instanceof Integer) {
                                Integer streamIdInt = (Integer) result;
                                streamId = streamIdInt.intValue();
                                log.debug("createStream result stream id: " + streamId); 
                                publish("publishtest", streamId, "live", this);
                        } else {
                                disconnect();
                                state = RTMPStreamState.STOPPED;
                        }
                        
                  }
        }
}
public void onStatus(Object obj)
{
   ObjectMap map = (ObjectMap) obj;
   String code = (String) map.get("code");
   String description = (String) map.get("description");
   String details = (String) map.get("details");
                
   if (StatusCodes.NS_PUBLISH_START.equals(code))
   {
        state = RTMPStreamState.PUBLISH_START;
        log.debug("{} for {}", new Object[]{code,details});
        
                service = new FLVService();
                service.setSerializer(new Serializer());
                service.setDeserializer(new Deserializer());
                
                log.info("Started Publishing");
                
                // Read In File And Publish The Data !!
                try {
                
                        File f = new File(dir + "/" + fileName);
                        log.debug("test: {}", f);

                        IFLV flv = (IFLV) service.getStreamableFile(f);
                        flv.setCache(NoCacheImpl.getInstance());        
                        
                        ITagReader reader = flv.getReader();
        
                        FileStreamSource src = new FileStreamSource(reader);
                        
                        while (src.hasMore())
                        {
                                IRTMPEvent event = src.dequeue();
                                RTMPMessage rtmpMsg = new RTMPMessage();
                                rtmpMsg.setBody(rtmpEvent);
                
                                publishStreamData(streamId, rtmpMsg);
                        }
                        
                        this.getRTMPClient().unpublish(streamId);

                } catch (Exception ex) {
                        log.error(ex.getCause().toString());
                }
   
   }
}

Shared Object Example

synchronized public void resultReceived(IPendingServiceCall call) { 
      Object result = call.getResult();
                
                if (result instanceof ObjectMap)
                {
                        
                        if (StatusCodes.NC_CONNECT_SUCCESS.equals(code))
                        {
                                log.info("Connected");
                                state = RTMPStreamState.STREAM_CREATING;
                                createStream(this);
                               
                                IClientSharedObject obj = getSharedObject("server", true);
                                obj.addSharedObjectListener(this);
                               
                                Map<String,Object> map = new HashMap<String,Object>();
                                map.put("key","value");
                        
                              obj.beginUpdate();
                             obj.setAttributes(map);
                            obj.endUpdate();
                            obj.clear();

                                
                        }
               }
.....
                else {
                
                  if ("createStream".equals(call.getServiceMethodName())) {
                        
                        state = RTMPStreamState.PLAYING;
        
                        if (result instanceof Integer) {
                                Integer streamIdInt = (Integer) result;
                                streamId = streamIdInt.intValue();
                                log.debug("createStream result stream id: " + streamId); 
                               
                        } else {
                                disconnect();
                                state = RTMPStreamState.STOPPED;
                        }
                        
                  }
        }
}

Sharedobject Listeners

public void onSharedObjectClear(ISharedObjectBase so) 
        {
                log.debug("Shared Object Clear");
        }
        
        public void onSharedObjectConnect(ISharedObjectBase so) 
        {
                log.debug("Shared Object Connect");
        }
        
        public void onSharedObjectDelete(ISharedObjectBase so, String key)
        {
                log.debug("Shared Object Delete");
        }
        
        public void onSharedObjectDisconnect(ISharedObjectBase so) 
        {
                log.debug("Shared Object Disconnect");
        }
        
        public void onSharedObjectSend(ISharedObjectBase so, String method, List params) 
        {
                log.debug("Shared Object Send");
        }
        
        public void onSharedObjectUpdate(ISharedObjectBase so, IAttributeStore values) 
        {
                log.debug("Shared Object Update");
        }
        
        public void onSharedObjectUpdate(ISharedObjectBase so, Map<String,Object> values) 
        {
                log.debug("Shared Object Updae");
        }
        
        public void onSharedObjectUpdate(ISharedObjectBase so, String key, Object value) 
        {
                log.debug("Shared Object Updae");
        }

Callback Example

synchronized public void resultReceived(IPendingServiceCall call) { 
      Object result = call.getResult();
                
                if (result instanceof ObjectMap)
                {
                        
                        if (StatusCodes.NC_CONNECT_SUCCESS.equals(code))
                        {
                                log.info("Connected");
                                state = RTMPStreamState.STREAM_CREATING;
                                createStream(this);
                               
                               invoke("service.CallService", new Object[]{"arg1","arg2"}, this);
                               
                                

                                
                        }
               }
.....
                else {
                
                  if ("createStream".equals(call.getServiceMethodName())) {
                        
                        state = RTMPStreamState.PLAYING;
        
                        if (result instanceof Integer) {
                                Integer streamIdInt = (Integer) result;
                                streamId = streamIdInt.intValue();
                                log.debug("createStream result stream id: " + streamId); 
                            
                        } else {
                                disconnect();
                                state = RTMPStreamState.STOPPED;
                        }
                        
                  }
        }
}